aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-31 21:38:10 -0700
committerReynold Xin <rxin@apache.org>2013-10-31 21:38:10 -0700
commit99bfcc91e010ba29852ec7dd0b4270805b7b2377 (patch)
treea2e16697989d12d931a288ee74954fc83a5ff5ba
parentfcaaf8680313fae08fc8a644b1be84ece650dc2a (diff)
parentdb89ac4bc8cbabaa13b36b75f4be2d96c29cb83a (diff)
downloadspark-99bfcc91e010ba29852ec7dd0b4270805b7b2377.tar.gz
spark-99bfcc91e010ba29852ec7dd0b4270805b7b2377.tar.bz2
spark-99bfcc91e010ba29852ec7dd0b4270805b7b2377.zip
Merge pull request #46 from jegonzal/VertexSetWithHashSet
Switched VertexSetRDD and GraphImpl to use OpenHashSet
-rw-r--r--core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala47
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala6
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala233
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala217
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/package.scala10
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala11
8 files changed, 305 insertions, 277 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
index a376d1015a..e53551ced6 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
@@ -27,14 +27,21 @@ package org.apache.spark.util.hash
*/
private[spark]
class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
- initialCapacity: Int)
+ val keySet: OpenHashSet[K], var _values: Array[V])
extends Iterable[(K, V)]
with Serializable {
- def this() = this(64)
+ /**
+ * Allocate an OpenHashMap with a fixed initial capacity
+ */
+ def this(initialCapacity: Int = 64) =
+ this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
+
+ /**
+ * Allocate an OpenHashMap with a fixed initial capacity
+ */
+ def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
- protected var _keySet = new OpenHashSet[K](initialCapacity)
- private var _values = new Array[V](_keySet.capacity)
@transient private var _oldValues: Array[V] = null
@@ -42,14 +49,14 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
private var haveNullValue = false
private var nullValue: V = null.asInstanceOf[V]
- override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size
+ override def size: Int = if (haveNullValue) keySet.size + 1 else keySet.size
/** Get the value for a given key */
def apply(k: K): V = {
if (k == null) {
nullValue
} else {
- val pos = _keySet.getPos(k)
+ val pos = keySet.getPos(k)
if (pos < 0) {
null.asInstanceOf[V]
} else {
@@ -64,9 +71,26 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
haveNullValue = true
nullValue = v
} else {
- val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+ val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
_values(pos) = v
- _keySet.rehashIfNeeded(k, grow, move)
+ keySet.rehashIfNeeded(k, grow, move)
+ _oldValues = null
+ }
+ }
+
+ /** Set the value for a key */
+ def setMerge(k: K, v: V, mergeF: (V,V) => V) {
+ if (k == null) {
+ if(haveNullValue) {
+ nullValue = mergeF(nullValue, v)
+ } else {
+ haveNullValue = true
+ nullValue = v
+ }
+ } else {
+ val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+ _values(pos) = mergeF(_values(pos), v)
+ keySet.rehashIfNeeded(k, grow, move)
_oldValues = null
}
}
@@ -87,11 +111,11 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
}
nullValue
} else {
- val pos = _keySet.fastAdd(k)
+ val pos = keySet.fastAdd(k)
if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
val newValue = defaultValue
_values(pos & OpenHashSet.POSITION_MASK) = newValue
- _keySet.rehashIfNeeded(k, grow, move)
+ keySet.rehashIfNeeded(k, grow, move)
newValue
} else {
_values(pos) = mergeValue(_values(pos))
@@ -113,9 +137,9 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
}
pos += 1
}
- pos = _keySet.nextPos(pos)
+ pos = keySet.nextPos(pos)
if (pos >= 0) {
- val ret = (_keySet.getValue(pos), _values(pos))
+ val ret = (keySet.getValue(pos), _values(pos))
pos += 1
ret
} else {
@@ -146,3 +170,4 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
_values(newPos) = _oldValues(oldPos)
}
}
+
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
index 7aa3f6220c..d083ab26ac 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
@@ -81,6 +81,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
protected var _data = classManifest[T].newArray(_capacity)
protected var _bitset = new BitSet(_capacity)
+ def getBitSet = _bitset
+
/** Number of elements in the set. */
def size: Int = _size
@@ -147,6 +149,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
/** Return the value at the specified position. */
def getValue(pos: Int): T = _data(pos)
+ /** Return the value at the specified position. */
+ def getValueSafe(pos: Int): T = {
+ assert(_bitset.get(pos))
+ _data(pos)
+ }
+
+
/**
* Return the next position with an element stored, starting from the given position inclusively.
*/
diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
index 14c1367207..08fc74e5da 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
@@ -28,35 +28,56 @@ package org.apache.spark.util.hash
private[spark]
class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
@specialized(Long, Int, Double) V: ClassManifest](
- initialCapacity: Int)
+ val keySet: OpenHashSet[K], var _values: Array[V])
extends Iterable[(K, V)]
with Serializable {
- def this() = this(64)
+ /**
+ * Allocate an OpenHashMap with a fixed initial capacity
+ */
+ def this(initialCapacity: Int = 64) =
+ this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
- require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
+ /**
+ * Allocate an OpenHashMap with a fixed initial capacity
+ */
+ def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
- protected var _keySet = new OpenHashSet[K](initialCapacity)
- private var _values = new Array[V](_keySet.capacity)
+ require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
private var _oldValues: Array[V] = null
- override def size = _keySet.size
+ override def size = keySet.size
/** Get the value for a given key */
def apply(k: K): V = {
- val pos = _keySet.getPos(k)
+ val pos = keySet.getPos(k)
_values(pos)
}
/** Set the value for a key */
def update(k: K, v: V) {
- val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+ val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
_values(pos) = v
- _keySet.rehashIfNeeded(k, grow, move)
+ keySet.rehashIfNeeded(k, grow, move)
+ _oldValues = null
+ }
+
+
+ /** Set the value for a key */
+ def setMerge(k: K, v: V, mergeF: (V,V) => V) {
+ val pos = keySet.fastAdd(k)
+ val ind = pos & OpenHashSet.POSITION_MASK
+ if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { // if first add
+ _values(ind) = v
+ } else {
+ _values(ind) = mergeF(_values(ind), v)
+ }
+ keySet.rehashIfNeeded(k, grow, move)
_oldValues = null
}
+
/**
* If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
* set its value to mergeValue(oldValue).
@@ -64,11 +85,11 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
* @return the newly updated value.
*/
def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
- val pos = _keySet.fastAdd(k)
+ val pos = keySet.fastAdd(k)
if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
val newValue = defaultValue
_values(pos & OpenHashSet.POSITION_MASK) = newValue
- _keySet.rehashIfNeeded(k, grow, move)
+ keySet.rehashIfNeeded(k, grow, move)
newValue
} else {
_values(pos) = mergeValue(_values(pos))
@@ -82,9 +103,9 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
/** Get the next value we should return from next(), or null if we're finished iterating */
def computeNextPair(): (K, V) = {
- pos = _keySet.nextPos(pos)
+ pos = keySet.nextPos(pos)
if (pos >= 0) {
- val ret = (_keySet.getValue(pos), _values(pos))
+ val ret = (keySet.getValue(pos), _values(pos))
pos += 1
ret
} else {
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index 821063e1f8..62f445127c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -1,14 +1,11 @@
package org.apache.spark.graph
-import org.apache.spark.util.hash.BitSet
-
-
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl.MessageToPartition
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.graph.impl._
-import scala.collection.mutable.BitSet
+import org.apache.spark.util.hash.BitSet
class GraphKryoRegistrator extends KryoRegistrator {
@@ -20,7 +17,6 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
-
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
index 8acc89a95b..f26e286003 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
@@ -31,6 +31,8 @@ import org.apache.spark.Partitioner._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.hash.BitSet
+import org.apache.spark.util.hash.OpenHashSet
+import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
@@ -60,7 +62,6 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
} // end of VertexSetIndex
-
/**
* An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there
* is only one entry for each vertex and by pre-indexing the entries
@@ -97,7 +98,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
*/
class VertexSetRDD[@specialized V: ClassManifest](
@transient val index: VertexSetIndex,
- @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
+ @transient val valuesRDD: RDD[ ( (Int => V), BitSet) ])
extends RDD[(Vid, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
@@ -160,15 +161,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
* Provide the RDD[(K,V)] equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = {
- tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) =>
- // Walk the index to construct the key, value pairs
- indexMap.iterator
- // Extract rows with key value pairs and indicators
- .map{ case (k, ind) => (bs.get(ind), k, ind) }
- // Remove tuples that aren't actually present in the array
- .filter( _._1 )
- // Extract the pair (removing the indicator from the tuple)
- .map( x => (x._2, values(x._3) ) )
+ tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) =>
+ bs.iterator.map(ind => (indexMap.getValueSafe(ind), values(ind)))
}
} // end of compute
@@ -188,18 +182,22 @@ class VertexSetRDD[@specialized V: ClassManifest](
val cleanPred = index.rdd.context.clean(pred)
val newValues = index.rdd.zipPartitions(valuesRDD){
(keysIter: Iterator[VertexIdToIndexMap],
- valuesIter: Iterator[(IndexedSeq[V], BitSet)]) =>
+ valuesIter: Iterator[(Int => V, BitSet)]) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
val (oldValues, bs) = valuesIter.next()
assert(valuesIter.hasNext() == false)
// Allocate the array to store the results into
- val newBS = new BitSet(oldValues.size)
- // Populate the new Values
- for( (k,i) <- index ) {
- if( bs.get(i) && cleanPred( (k, oldValues(i)) ) ) {
- newBS.set(i)
+ val newBS = new BitSet(index.capacity)
+ // Iterate over the active bits in the old bitset and
+ // evaluate the predicate
+ var ind = bs.nextSetBit(0)
+ while(ind >= 0) {
+ val k = index.getValueSafe(ind)
+ if( cleanPred( (k, oldValues(ind)) ) ) {
+ newBS.set(ind)
}
+ ind = bs.nextSetBit(ind+1)
}
Array((oldValues, newBS)).iterator
}
@@ -219,32 +217,13 @@ class VertexSetRDD[@specialized V: ClassManifest](
* VertexSetRDD retains the same index.
*/
def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
- val cleanF = index.rdd.context.clean(f)
- val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
+ val newValuesRDD: RDD[ (Int => U, BitSet) ] =
valuesRDD.mapPartitions(iter => iter.map{
case (values, bs: BitSet) =>
-
- /**
- * @todo Consider using a view rather than creating a new
- * array. This is already being done for join operations.
- * It could reduce memory overhead but require additional
- * recomputation.
- */
- val newValues = new Array[U](values.size)
- var ind = bs.nextSetBit(0)
- while(ind >= 0) {
- // if(ind >= newValues.size) {
- // println(ind)
- // println(newValues.size)
- // bs.iterator.foreach(print(_))
- // }
- // assert(ind < newValues.size)
- // assert(ind < values.size)
- newValues(ind) = cleanF(values(ind))
- ind = bs.nextSetBit(ind+1)
- }
- (newValues.toIndexedSeq, bs)
- }, preservesPartitioning = true)
+ val newValues: (Int => U) =
+ (ind: Int) => if (bs.get(ind)) f(values(ind)) else null.asInstanceOf[U]
+ (newValues, bs)
+ }, preservesPartitioning = true)
new VertexSetRDD[U](index, newValuesRDD)
} // end of mapValues
@@ -262,27 +241,20 @@ class VertexSetRDD[@specialized V: ClassManifest](
* VertexSetRDD retains the same index.
*/
def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
- val cleanF = index.rdd.context.clean(f)
- val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
+ val newValues: RDD[ (Int => U, BitSet) ] =
index.rdd.zipPartitions(valuesRDD){
(keysIter: Iterator[VertexIdToIndexMap],
- valuesIter: Iterator[(IndexedSeq[V], BitSet)]) =>
+ valuesIter: Iterator[(Int => V, BitSet)]) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
val (oldValues, bs: BitSet) = valuesIter.next()
assert(valuesIter.hasNext() == false)
- /**
- * @todo Consider using a view rather than creating a new array.
- * This is already being done for join operations. It could reduce
- * memory overhead but require additional recomputation.
- */
- // Allocate the array to store the results into
- val newValues: Array[U] = new Array[U](oldValues.size)
- // Populate the new Values
- for( (k,i) <- index ) {
- if (bs.get(i)) { newValues(i) = cleanF(k, oldValues(i)) }
+ // Cosntruct a view of the map transformation
+ val newValues: (Int => U) = (ind: Int) => {
+ if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) }
+ else { null.asInstanceOf[U] }
}
- Array((newValues.toIndexedSeq, bs)).iterator
+ Iterator((newValues, bs))
}
new VertexSetRDD[U](index, newValues)
} // end of mapValuesWithKeys
@@ -305,17 +277,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
- val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] =
+ val newValuesRDD: RDD[ (Int => (V,W), BitSet) ] =
valuesRDD.zipPartitions(other.valuesRDD){
- (thisIter: Iterator[(IndexedSeq[V], BitSet)],
- otherIter: Iterator[(IndexedSeq[W], BitSet)]) =>
+ (thisIter: Iterator[(Int => V, BitSet)],
+ otherIter: Iterator[(Int => W, BitSet)]) =>
val (thisValues, thisBS: BitSet) = thisIter.next()
assert(!thisIter.hasNext)
val (otherValues, otherBS: BitSet) = otherIter.next()
assert(!otherIter.hasNext)
val newBS: BitSet = thisBS & otherBS
- val newValues = thisValues.view.zip(otherValues)
- Iterator((newValues.toIndexedSeq, newBS))
+ val newValues: Int => (V,W) = (ind: Int) => (thisValues(ind), otherValues(ind))
+ Iterator((newValues, newBS))
}
new VertexSetRDD(index, newValuesRDD)
}
@@ -340,18 +312,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
- val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
+ val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] =
valuesRDD.zipPartitions(other.valuesRDD){
- (thisIter: Iterator[(IndexedSeq[V], BitSet)],
- otherIter: Iterator[(IndexedSeq[W], BitSet)]) =>
+ (thisIter: Iterator[(Int => V, BitSet)],
+ otherIter: Iterator[(Int => W, BitSet)]) =>
val (thisValues, thisBS: BitSet) = thisIter.next()
assert(!thisIter.hasNext)
val (otherValues, otherBS: BitSet) = otherIter.next()
assert(!otherIter.hasNext)
- val otherOption = otherValues.view.zipWithIndex
- .map{ (x: (W, Int)) => if(otherBS.get(x._2)) Option(x._1) else None }
- val newValues = thisValues.view.zip(otherOption)
- Iterator((newValues.toIndexedSeq, thisBS))
+ val newValues: Int => (V, Option[W]) = (ind: Int) =>
+ (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
+ Iterator((newValues, thisBS))
}
new VertexSetRDD(index, newValuesRDD)
} // end of leftZipJoin
@@ -378,7 +349,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
def leftJoin[W: ClassManifest](
other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a):
VertexSetRDD[(V, Option[W]) ] = {
- val cleanMerge = index.rdd.context.clean(merge)
// Test if the other vertex is a VertexSetRDD to choose the optimal
// join strategy
other match {
@@ -396,10 +366,10 @@ class VertexSetRDD[@specialized V: ClassManifest](
if (other.partitioner == partitioner) other
else other.partitionBy(partitioner.get)
// Compute the new values RDD
- val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
+ val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] =
index.rdd.zipPartitions(valuesRDD, otherShuffled) {
(thisIndexIter: Iterator[VertexIdToIndexMap],
- thisIter: Iterator[(IndexedSeq[V], BitSet)],
+ thisIter: Iterator[(Int => V, BitSet)],
tuplesIter: Iterator[(Vid,W)]) =>
// Get the Index and values for this RDD
val index = thisIndexIter.next()
@@ -407,33 +377,32 @@ class VertexSetRDD[@specialized V: ClassManifest](
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
// Create a new array to store the values in the resulting VertexSet
- val newW = new Array[W](thisValues.size)
+ val otherValues = new Array[W](index.capacity)
// track which values are matched with values in other
- val wBS = new BitSet(thisValues.size)
- // Loop over all the tuples that have vertices in this VertexSet.
- for( (k, w) <- tuplesIter if index.contains(k) ) {
- val ind = index.get(k)
- // Not all the vertex ids in the index are in this VertexSet.
- // If there is a vertex in this set then record the other value
- if(thisBS.get(ind)) {
- if(wBS.get(ind)) {
- newW(ind) = cleanMerge(newW(ind), w)
- } else {
- newW(ind) = w
- wBS.set(ind)
+ val otherBS = new BitSet(index.capacity)
+ for ((k,w) <- tuplesIter) {
+ // Get the location of the key in the index
+ val pos = index.getPos(k)
+ // Only if the key is already in the index
+ if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) {
+ // Get the actual index
+ val ind = pos & OpenHashSet.POSITION_MASK
+ // If this value has already been seen then merge
+ if (otherBS.get(ind)) {
+ otherValues(ind) = merge(otherValues(ind), w)
+ } else { // otherwise just store the new value
+ otherBS.set(ind)
+ otherValues(ind) = w
}
}
- } // end of for loop over tuples
+ }
// Some vertices in this vertex set may not have a corresponding
// tuple in the join and so a None value should be returned.
- val otherOption = newW.view.zipWithIndex
- .map{ (x: (W, Int)) => if(wBS.get(x._2)) Option(x._1) else None }
- // the final values is the zip of the values in this RDD along with
- // the values in the other
- val newValues = thisValues.view.zip(otherOption)
- Iterator((newValues.toIndexedSeq, thisBS))
+ val newValues: Int => (V, Option[W]) = (ind: Int) =>
+ (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
+ Iterator((newValues, thisBS))
} // end of newValues
- new VertexSetRDD(index, newValues)
+ new VertexSetRDD(index, newValuesRDD)
}
}
} // end of leftJoin
@@ -443,6 +412,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
* For each key k in `this` or `other`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this` as well as `other`.
*/
+ /*
def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner):
VertexSetRDD[(Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = {
@@ -489,16 +459,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
assert(!thisIter.hasNext)
val otherIndex = otherIter.next()
assert(!otherIter.hasNext)
- val newIndex = new VertexIdToIndexMap()
- // @todo Merge only the keys that correspond to non-null values
// Merge the keys
- newIndex.putAll(thisIndex)
- newIndex.putAll(otherIndex)
- // We need to rekey the index
- var ctr = 0
- for (e <- newIndex.entrySet) {
- e.setValue(ctr)
- ctr += 1
+ val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity)
+ var ind = thisIndex.nextPos(0)
+ while(ind >= 0) {
+ newIndex.fastAdd(thisIndex.getValue(ind))
+ ind = thisIndex.nextPos(ind+1)
+ }
+ var ind = otherIndex.nextPos(0)
+ while(ind >= 0) {
+ newIndex.fastAdd(otherIndex.getValue(ind))
+ ind = otherIndex.nextPos(ind+1)
}
List(newIndex).iterator
}).cache()
@@ -604,7 +575,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
}
}
} // end of cogroup
-
+ */
} // End of VertexSetRDD
@@ -649,25 +620,18 @@ object VertexSetRDD {
}
val groups = preAgg.mapPartitions( iter => {
- val indexMap = new VertexIdToIndexMap()
- val values = new ArrayBuffer[V]
+ val hashMap = new PrimitiveKeyOpenHashMap[Vid, V]
for ((k,v) <- iter) {
- if(!indexMap.contains(k)) {
- val ind = indexMap.size
- indexMap.put(k, ind)
- values.append(v)
- } else {
- val ind = indexMap.get(k)
- values(ind) = reduceFunc(values(ind), v)
- }
+ hashMap.setMerge(k, v, reduceFunc)
}
- val bs = new BitSet(indexMap.size)
- bs.setUntil(indexMap.size)
- Iterator( (indexMap, (values.toIndexedSeq, bs)) )
+ val index = hashMap.keySet
+ val values: Int => V = (ind: Int) => hashMap._values(ind)
+ val bs = index.getBitSet
+ Iterator( (index, (values, bs)) )
}, true).cache
// extract the index and the values
val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
- val values: RDD[(IndexedSeq[V], BitSet)] =
+ val values: RDD[(Int => V, BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new VertexSetRDD[V](new VertexSetIndex(index), values)
} // end of apply
@@ -743,27 +707,31 @@ object VertexSetRDD {
}
// Use the index to build the new values table
- val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
+ val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// There is only one map
val index = indexIter.next()
assert(!indexIter.hasNext())
- val values = new Array[C](index.size)
- val bs = new BitSet(index.size)
+ val values = new Array[C](index.capacity)
+ val bs = new BitSet(index.capacity)
for ((k,c) <- tblIter) {
- // @todo this extra check may be costing us a lot!
- if (!index.contains(k)) {
+ // Get the location of the key in the index
+ val pos = index.getPos(k)
+ if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
throw new SparkException("Error: Trying to bind an external index " +
"to an RDD which contains keys that are not in the index.")
- }
- val ind = index(k)
- if (bs.get(ind)) {
- values(ind) = mergeCombiners(values(ind), c)
} else {
- values(ind) = c
- bs.set(ind)
+ // Get the actual index
+ val ind = pos & OpenHashSet.POSITION_MASK
+ // If this value has already been seen then merge
+ if (bs.get(ind)) {
+ values(ind) = mergeCombiners(values(ind), c)
+ } else { // otherwise just store the new value
+ bs.set(ind)
+ values(ind) = c
+ }
}
}
- Iterator((values, bs))
+ Iterator(((ind: Int) => values(ind), bs))
})
new VertexSetRDD(index, values)
} // end of apply
@@ -792,14 +760,9 @@ object VertexSetRDD {
}
val index = shuffledTbl.mapPartitions( iter => {
- val indexMap = new VertexIdToIndexMap()
- for ( (k,_) <- iter ){
- if(!indexMap.contains(k)){
- val ind = indexMap.size
- indexMap.put(k, ind)
- }
- }
- Iterator(indexMap)
+ val index = new VertexIdToIndexMap
+ for ( (k,_) <- iter ){ index.add(k) }
+ Iterator(index)
}, true).cache
new VertexSetIndex(index)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 016811db36..b80713dbf4 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -5,7 +5,6 @@ import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuilder
-import scala.collection.mutable.BitSet
import org.apache.spark.SparkContext._
@@ -21,6 +20,12 @@ import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._
+import org.apache.spark.util.hash.BitSet
+import org.apache.spark.util.hash.OpenHashSet
+import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
+
+
+
/**
* The Iterator type returned when constructing edge triplets
*/
@@ -31,15 +36,16 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
private var pos = 0
private val et = new EdgeTriplet[VD, ED]
+ private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
override def next() = {
et.srcId = edgePartition.srcIds(pos)
// assert(vmap.containsKey(e.src.id))
- et.srcAttr = vertexArray(vidToIndex(et.srcId))
+ et.srcAttr = vmap(et.srcId)
et.dstId = edgePartition.dstIds(pos)
// assert(vmap.containsKey(e.dst.id))
- et.dstAttr = vertexArray(vidToIndex(et.dstId))
+ et.dstAttr = vmap(et.dstId)
et.attr = edgePartition.data(pos)
pos += 1
et
@@ -51,10 +57,10 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
for (i <- (0 until edgePartition.size)) {
currentEdge.srcId = edgePartition.srcIds(i)
// assert(vmap.containsKey(e.src.id))
- currentEdge.srcAttr = vertexArray(vidToIndex(currentEdge.srcId))
+ currentEdge.srcAttr = vmap(currentEdge.srcId)
currentEdge.dstId = edgePartition.dstIds(i)
// assert(vmap.containsKey(e.dst.id))
- currentEdge.dstAttr = vertexArray(vidToIndex(currentEdge.dstId))
+ currentEdge.dstAttr = vmap(currentEdge.dstId)
currentEdge.attr = edgePartition.data(i)
lb += currentEdge
}
@@ -63,23 +69,6 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
} // end of Edge Triplet Iterator
-
-object EdgeTripletBuilder {
- def makeTriplets[VD: ClassManifest, ED: ClassManifest](
- localVidMap: RDD[(Pid, VertexIdToIndexMap)],
- vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
- eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
- localVidMap.zipPartitions(vTableReplicatedValues, eTable) {
- (vidMapIter, replicatedValuesIter, eTableIter) =>
- val (_, vidToIndex) = vidMapIter.next()
- val (_, vertexArray) = replicatedValuesIter.next()
- val (_, edgePartition) = eTableIter.next()
- new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
- }
- }
-}
-
-
/**
* A Graph RDD that supports computation on graphs.
*/
@@ -90,6 +79,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val eTable: RDD[(Pid, EdgePartition[ED])] )
extends Graph[VD, ED] {
+ def this() = this(null, null, null, null)
+
+
+
/**
* (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
* vertex data after it is replicated. Within each partition, it holds a map
@@ -115,7 +108,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
/** Return a RDD that brings edges with its source and destination vertices together. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
- EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable)
+ makeTriplets(localVidMap, vTableReplicatedValues, eTable)
override def cache(): Graph[VD, ED] = {
@@ -219,24 +212,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
- override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2):
- Graph[VD, ED2] = {
- val newETable = eTable.zipPartitions(localVidMap, vTableReplicatedValues){
- (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
- val (pid, edgePartition) = edgePartitionIter.next()
- val (_, vidToIndex) = vidToIndexIter.next()
- val (_, vertexArray) = vertexArrayIter.next()
- val et = new EdgeTriplet[VD, ED]
- val newEdgePartition = edgePartition.map{e =>
- et.set(e)
- et.srcAttr = vertexArray(vidToIndex(e.srcId))
- et.dstAttr = vertexArray(vidToIndex(e.dstId))
- f(et)
- }
- Iterator((pid, newEdgePartition))
- }
- new GraphImpl(vTable, vid2pid, localVidMap, newETable)
- }
+ override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] =
+ GraphImpl.mapTriplets(this, f)
override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
@@ -330,57 +307,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
reduceFunc: (A, A) => A)
- : VertexSetRDD[A] = {
-
- ClosureCleaner.clean(mapFunc)
- ClosureCleaner.clean(reduceFunc)
-
- // Map and preaggregate
- val preAgg = eTable.zipPartitions(localVidMap, vTableReplicatedValues){
- (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
- val (pid, edgePartition) = edgePartitionIter.next()
- val (_, vidToIndex) = vidToIndexIter.next()
- val (_, vertexArray) = vertexArrayIter.next()
- // We can reuse the vidToIndex map for aggregation here as well.
- /** @todo Since this has the downside of not allowing "messages" to arbitrary
- * vertices we should consider just using a fresh map.
- */
- val msgArray = new Array[A](vertexArray.size)
- val msgBS = new BitSet(vertexArray.size)
- // Iterate over the partition
- val et = new EdgeTriplet[VD, ED]
- edgePartition.foreach{e =>
- et.set(e)
- et.srcAttr = vertexArray(vidToIndex(e.srcId))
- et.dstAttr = vertexArray(vidToIndex(e.dstId))
- mapFunc(et).foreach{ case (vid, msg) =>
- // verify that the vid is valid
- assert(vid == et.srcId || vid == et.dstId)
- val ind = vidToIndex(vid)
- // Populate the aggregator map
- if(msgBS(ind)) {
- msgArray(ind) = reduceFunc(msgArray(ind), msg)
- } else {
- msgArray(ind) = msg
- msgBS(ind) = true
- }
- }
- }
- // Return the aggregate map
- vidToIndex.long2IntEntrySet().fastIterator()
- // Remove the entries that did not receive a message
- .filter{ entry => msgBS(entry.getValue()) }
- // Construct the actual pairs
- .map{ entry =>
- val vid = entry.getLongKey()
- val ind = entry.getValue()
- val msg = msgArray(ind)
- (vid, msg)
- }
- }.partitionBy(vTable.index.rdd.partitioner.get)
- // do the final reduction reusing the index map
- VertexSetRDD(preAgg, vTable.index, reduceFunc)
- }
+ : VertexSetRDD[A] =
+ GraphImpl.mapReduceTriplets(this, mapFunc, reduceFunc)
override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
@@ -436,7 +364,6 @@ object GraphImpl {
}
-
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
@@ -494,16 +421,9 @@ object GraphImpl {
RDD[(Pid, VertexIdToIndexMap)] = {
eTable.mapPartitions( _.map{ case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
- var i = 0
epart.foreach{ e =>
- if(!vidToIndex.contains(e.srcId)) {
- vidToIndex.put(e.srcId, i)
- i += 1
- }
- if(!vidToIndex.contains(e.dstId)) {
- vidToIndex.put(e.dstId, i)
- i += 1
- }
+ vidToIndex.add(e.srcId)
+ vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
}, preservesPartitioning = true).cache()
@@ -528,9 +448,9 @@ object GraphImpl {
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
- val vertexArray = new Array[VD](vidToIndex.size)
+ val vertexArray = new Array[VD](vidToIndex.capacity)
for (msg <- msgsIter) {
- val ind = vidToIndex(msg.data._1)
+ val ind = vidToIndex.getPos(msg.data._1) & OpenHashSet.POSITION_MASK
vertexArray(ind) = msg.data._2
}
Iterator((pid, vertexArray))
@@ -540,6 +460,95 @@ object GraphImpl {
}
+ def makeTriplets[VD: ClassManifest, ED: ClassManifest](
+ localVidMap: RDD[(Pid, VertexIdToIndexMap)],
+ vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
+ eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
+ localVidMap.zipPartitions(vTableReplicatedValues, eTable) {
+ (vidMapIter, replicatedValuesIter, eTableIter) =>
+ val (_, vidToIndex) = vidMapIter.next()
+ val (_, vertexArray) = replicatedValuesIter.next()
+ val (_, edgePartition) = eTableIter.next()
+ new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
+ }
+ }
+
+
+ def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
+ g: GraphImpl[VD, ED],
+ f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+ val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
+ (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
+ val (pid, edgePartition) = edgePartitionIter.next()
+ val (_, vidToIndex) = vidToIndexIter.next()
+ val (_, vertexArray) = vertexArrayIter.next()
+ val et = new EdgeTriplet[VD, ED]
+ val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
+ val newEdgePartition = edgePartition.map{e =>
+ et.set(e)
+ et.srcAttr = vmap(e.srcId)
+ et.dstAttr = vmap(e.dstId)
+ f(et)
+ }
+ Iterator((pid, newEdgePartition))
+ }
+ new GraphImpl(g.vTable, g.vid2pid, g.localVidMap, newETable)
+ }
+
+
+ def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
+ g: GraphImpl[VD, ED],
+ mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
+ reduceFunc: (A, A) => A): VertexSetRDD[A] = {
+
+ ClosureCleaner.clean(mapFunc)
+ ClosureCleaner.clean(reduceFunc)
+
+ // Map and preaggregate
+ val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
+ (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
+ val (pid, edgePartition) = edgePartitionIter.next()
+ val (_, vidToIndex) = vidToIndexIter.next()
+ val (_, vertexArray) = vertexArrayIter.next()
+ assert(!edgePartitionIter.hasNext)
+ assert(!vidToIndexIter.hasNext)
+ assert(!vertexArrayIter.hasNext)
+ assert(vidToIndex.capacity == vertexArray.size)
+ val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
+ // We can reuse the vidToIndex map for aggregation here as well.
+ /** @todo Since this has the downside of not allowing "messages" to arbitrary
+ * vertices we should consider just using a fresh map.
+ */
+ val msgArray = new Array[A](vertexArray.size)
+ val msgBS = new BitSet(vertexArray.size)
+ // Iterate over the partition
+ val et = new EdgeTriplet[VD, ED]
+ edgePartition.foreach{e =>
+ et.set(e)
+ et.srcAttr = vmap(e.srcId)
+ et.dstAttr = vmap(e.dstId)
+ mapFunc(et).foreach{ case (vid, msg) =>
+ // verify that the vid is valid
+ assert(vid == et.srcId || vid == et.dstId)
+ // Get the index of the key
+ val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
+ // Populate the aggregator map
+ if(msgBS.get(ind)) {
+ msgArray(ind) = reduceFunc(msgArray(ind), msg)
+ } else {
+ msgArray(ind) = msg
+ msgBS.set(ind)
+ }
+ }
+ }
+ // construct an iterator of tuples Iterator[(Vid, A)]
+ msgBS.iterator.map( ind => (vidToIndex.getValue(ind), msgArray(ind)) )
+ }.partitionBy(g.vTable.index.rdd.partitioner.get)
+ // do the final reduction reusing the index map
+ VertexSetRDD(preAgg, g.vTable.index, reduceFunc)
+ }
+
+
protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
index 4627c3566c..37a4fb4a5e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/package.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -1,5 +1,10 @@
package org.apache.spark
+import org.apache.spark.util.hash.BitSet
+import org.apache.spark.util.hash.OpenHashSet
+import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
+
+
package object graph {
type Vid = Long
@@ -8,8 +13,9 @@ package object graph {
type VertexHashMap[T] = it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap[T]
type VertexSet = it.unimi.dsi.fastutil.longs.LongOpenHashSet
type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList
- // @todo replace with rxin's fast hashmap
- type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
+
+ // type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
+ type VertexIdToIndexMap = OpenHashSet[Vid]
/**
* Return the default null-like value for a data type T.
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index f2b3d5bdfe..2067b1613e 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -77,16 +77,15 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
val b = VertexSetRDD(a).mapValues(x => -x)
- assert(b.leftJoin(a)
- .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
+ assert(b.count === 101)
+ assert(b.leftJoin(a).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
val c = VertexSetRDD(a, b.index)
- assert(b.leftJoin(c)
- .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
+ assert(b.leftJoin(c).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
val d = c.filter(q => ((q._2 % 2) == 0))
val e = a.filter(q => ((q._2 % 2) == 0))
assert(d.count === e.count)
- assert(b.zipJoin(c).mapValues(x => x._1 + x._2)
- .map(x => x._2).reduce(_+_) === 0)
+ assert(b.zipJoin(c).mapValues(x => x._1 + x._2).map(x => x._2).reduce(_+_) === 0)
+
}
}