From d74ad4ebc910e62f2598b7a7323fdc678fa179ca Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 31 Oct 2013 18:01:34 -0700 Subject: Adding ability to access local BitSet and to safely get a value at a given position --- core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala index 7aa3f6220c..d083ab26ac 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala @@ -81,6 +81,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( protected var _data = classManifest[T].newArray(_capacity) protected var _bitset = new BitSet(_capacity) + def getBitSet = _bitset + /** Number of elements in the set. */ def size: Int = _size @@ -147,6 +149,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( /** Return the value at the specified position. */ def getValue(pos: Int): T = _data(pos) + /** Return the value at the specified position. */ + def getValueSafe(pos: Int): T = { + assert(_bitset.get(pos)) + _data(pos) + } + + /** * Return the next position with an element stored, starting from the given position inclusively. */ -- cgit v1.2.3 From 4ad58e2b9a6e21b5b23ae2b0c62633e085ef3a61 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 31 Oct 2013 18:09:42 -0700 Subject: This commit makes three changes to the (PrimitiveKey)OpenHashMap 1) _keySet --renamed--> keySet 2) keySet and _values are made externally accessible 3) added an update function which merges duplicate values --- .../org/apache/spark/util/hash/OpenHashMap.scala | 49 ++++++++++++++++------ .../spark/util/hash/PrimitiveKeyOpenHashMap.scala | 47 +++++++++++++++------ 2 files changed, 71 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala index a376d1015a..af282d5651 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala @@ -27,14 +27,21 @@ package org.apache.spark.util.hash */ private[spark] class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest]( - initialCapacity: Int) + var keySet: OpenHashSet[K], var _values: Array[V]) extends Iterable[(K, V)] with Serializable { - def this() = this(64) + /** + * Allocate an OpenHashMap with a fixed initial capacity + */ + def this(initialCapacity: Int = 64) = + this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity)) + + /** + * Allocate an OpenHashMap with a fixed initial capacity + */ + def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity)) - protected var _keySet = new OpenHashSet[K](initialCapacity) - private var _values = new Array[V](_keySet.capacity) @transient private var _oldValues: Array[V] = null @@ -42,14 +49,14 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: private var haveNullValue = false private var nullValue: V = null.asInstanceOf[V] - override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size + override def size: Int = if (haveNullValue) keySet.size + 1 else keySet.size /** Get the value for a given key */ def apply(k: K): V = { if (k == null) { nullValue } else { - val pos = _keySet.getPos(k) + val pos = keySet.getPos(k) if (pos < 0) { null.asInstanceOf[V] } else { @@ -64,9 +71,26 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: haveNullValue = true nullValue = v } else { - val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK _values(pos) = v - _keySet.rehashIfNeeded(k, grow, move) + keySet.rehashIfNeeded(k, grow, move) + _oldValues = null + } + } + + /** Set the value for a key */ + def update(k: K, v: V, mergeF: (V,V) => V) { + if (k == null) { + if(haveNullValue) { + nullValue = mergeF(nullValue, v) + } else { + haveNullValue = true + nullValue = v + } + } else { + val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + _values(pos) = mergeF(_values(pos), v) + keySet.rehashIfNeeded(k, grow, move) _oldValues = null } } @@ -87,11 +111,11 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: } nullValue } else { - val pos = _keySet.fastAdd(k) + val pos = keySet.fastAdd(k) if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue - _keySet.rehashIfNeeded(k, grow, move) + keySet.rehashIfNeeded(k, grow, move) newValue } else { _values(pos) = mergeValue(_values(pos)) @@ -113,9 +137,9 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: } pos += 1 } - pos = _keySet.nextPos(pos) + pos = keySet.nextPos(pos) if (pos >= 0) { - val ret = (_keySet.getValue(pos), _values(pos)) + val ret = (keySet.getValue(pos), _values(pos)) pos += 1 ret } else { @@ -146,3 +170,4 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: _values(newPos) = _oldValues(oldPos) } } + diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala index 14c1367207..cbfb2361b4 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala @@ -28,35 +28,56 @@ package org.apache.spark.util.hash private[spark] class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, @specialized(Long, Int, Double) V: ClassManifest]( - initialCapacity: Int) + var keySet: OpenHashSet[K], var _values: Array[V]) extends Iterable[(K, V)] with Serializable { - def this() = this(64) + /** + * Allocate an OpenHashMap with a fixed initial capacity + */ + def this(initialCapacity: Int = 64) = + this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity)) - require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int]) + /** + * Allocate an OpenHashMap with a fixed initial capacity + */ + def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity)) - protected var _keySet = new OpenHashSet[K](initialCapacity) - private var _values = new Array[V](_keySet.capacity) + require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int]) private var _oldValues: Array[V] = null - override def size = _keySet.size + override def size = keySet.size /** Get the value for a given key */ def apply(k: K): V = { - val pos = _keySet.getPos(k) + val pos = keySet.getPos(k) _values(pos) } /** Set the value for a key */ def update(k: K, v: V) { - val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK _values(pos) = v - _keySet.rehashIfNeeded(k, grow, move) + keySet.rehashIfNeeded(k, grow, move) + _oldValues = null + } + + + /** Set the value for a key */ + def update(k: K, v: V, mergeF: (V,V) => V) { + val pos = keySet.fastAdd(k) + val ind = pos & OpenHashSet.POSITION_MASK + if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { // if first add + _values(ind) = v + } else { + _values(ind) = mergeF(_values(ind), v) + } + keySet.rehashIfNeeded(k, grow, move) _oldValues = null } + /** * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise, * set its value to mergeValue(oldValue). @@ -64,11 +85,11 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, * @return the newly updated value. */ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = { - val pos = _keySet.fastAdd(k) + val pos = keySet.fastAdd(k) if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue - _keySet.rehashIfNeeded(k, grow, move) + keySet.rehashIfNeeded(k, grow, move) newValue } else { _values(pos) = mergeValue(_values(pos)) @@ -82,9 +103,9 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, /** Get the next value we should return from next(), or null if we're finished iterating */ def computeNextPair(): (K, V) = { - pos = _keySet.nextPos(pos) + pos = keySet.nextPos(pos) if (pos >= 0) { - val ret = (_keySet.getValue(pos), _values(pos)) + val ret = (keySet.getValue(pos), _values(pos)) pos += 1 ret } else { -- cgit v1.2.3 From 8381aeffb34fecba53b943763ef65f35ef52289a Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 31 Oct 2013 18:13:02 -0700 Subject: This commit introduces the OpenHashSet and OpenHashMap as indexing primitives. Large parts of the VertexSetRDD were restructured to take advantage of: 1) the OpenHashSet as an index map 2) view based lazy mapValues and mapValuesWithVertices 3) the cogroup code is currently disabled (since it is not used in any of the tests) The GraphImpl was updated to also use the OpenHashSet and PrimitiveOpenHashMap wherever possible: 1) the LocalVidMaps (used to track replicated vertices) are now implemented using the OpenHashSet 2) an OpenHashMap is temporarily constructed to combine the local OpenHashSet with the local (replicated) vertex attribute arrays 3) because the OpenHashSet constructor grabs a class manifest all operations that construct OpenHashSets have been moved to the GraphImpl Singleton to prevent implicit variable capture within closures. --- .../apache/spark/graph/GraphKryoRegistrator.scala | 6 +- .../org/apache/spark/graph/VertexSetRDD.scala | 214 ++++++++++---------- .../org/apache/spark/graph/impl/GraphImpl.scala | 217 +++++++++++---------- .../scala/org/apache/spark/graph/package.scala | 10 +- .../scala/org/apache/spark/graph/GraphSuite.scala | 11 +- 5 files changed, 225 insertions(+), 233 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index 821063e1f8..62f445127c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -1,14 +1,11 @@ package org.apache.spark.graph -import org.apache.spark.util.hash.BitSet - - import com.esotericsoftware.kryo.Kryo import org.apache.spark.graph.impl.MessageToPartition import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.graph.impl._ -import scala.collection.mutable.BitSet +import org.apache.spark.util.hash.BitSet class GraphKryoRegistrator extends KryoRegistrator { @@ -20,7 +17,6 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[EdgePartition[Object]]) kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) - // This avoids a large number of hash table lookups. kryo.setReferences(false) } diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 8acc89a95b..b3647c083e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -31,6 +31,8 @@ import org.apache.spark.Partitioner._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.hash.BitSet +import org.apache.spark.util.hash.OpenHashSet +import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap @@ -160,15 +162,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( * Provide the RDD[(K,V)] equivalent output. */ override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = { - tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => - // Walk the index to construct the key, value pairs - indexMap.iterator - // Extract rows with key value pairs and indicators - .map{ case (k, ind) => (bs.get(ind), k, ind) } - // Remove tuples that aren't actually present in the array - .filter( _._1 ) - // Extract the pair (removing the indicator from the tuple) - .map( x => (x._2, values(x._3) ) ) + tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => + bs.iterator.map(ind => (indexMap.getValueSafe(ind), values(ind))) } } // end of compute @@ -195,11 +190,15 @@ class VertexSetRDD[@specialized V: ClassManifest]( assert(valuesIter.hasNext() == false) // Allocate the array to store the results into val newBS = new BitSet(oldValues.size) - // Populate the new Values - for( (k,i) <- index ) { - if( bs.get(i) && cleanPred( (k, oldValues(i)) ) ) { - newBS.set(i) + // Iterate over the active bits in the old bitset and + // evaluate the predicate + var ind = bs.nextSetBit(0) + while(ind >= 0) { + val k = index.getValueSafe(ind) + if( cleanPred( (k, oldValues(ind)) ) ) { + newBS.set(ind) } + ind = bs.nextSetBit(ind+1) } Array((oldValues, newBS)).iterator } @@ -223,27 +222,10 @@ class VertexSetRDD[@specialized V: ClassManifest]( val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = valuesRDD.mapPartitions(iter => iter.map{ case (values, bs: BitSet) => - - /** - * @todo Consider using a view rather than creating a new - * array. This is already being done for join operations. - * It could reduce memory overhead but require additional - * recomputation. - */ - val newValues = new Array[U](values.size) - var ind = bs.nextSetBit(0) - while(ind >= 0) { - // if(ind >= newValues.size) { - // println(ind) - // println(newValues.size) - // bs.iterator.foreach(print(_)) - // } - // assert(ind < newValues.size) - // assert(ind < values.size) - newValues(ind) = cleanF(values(ind)) - ind = bs.nextSetBit(ind+1) - } - (newValues.toIndexedSeq, bs) + val newValues: IndexedSeq[U] = values.view.zipWithIndex.map{ + (x: (V, Int)) => if(bs.get(x._2)) cleanF(x._1) else null.asInstanceOf[U] + }.toIndexedSeq // @todo check the toIndexedSeq is free + (newValues, bs) }, preservesPartitioning = true) new VertexSetRDD[U](index, newValuesRDD) } // end of mapValues @@ -271,18 +253,14 @@ class VertexSetRDD[@specialized V: ClassManifest]( assert(keysIter.hasNext() == false) val (oldValues, bs: BitSet) = valuesIter.next() assert(valuesIter.hasNext() == false) - /** - * @todo Consider using a view rather than creating a new array. - * This is already being done for join operations. It could reduce - * memory overhead but require additional recomputation. - */ - // Allocate the array to store the results into - val newValues: Array[U] = new Array[U](oldValues.size) - // Populate the new Values - for( (k,i) <- index ) { - if (bs.get(i)) { newValues(i) = cleanF(k, oldValues(i)) } - } - Array((newValues.toIndexedSeq, bs)).iterator + // Cosntruct a view of the map transformation + val newValues: IndexedSeq[U] = oldValues.view.zipWithIndex.map{ + (x: (V, Int)) => + if(bs.get(x._2)) { + cleanF(index.getValueSafe(x._2), x._1) + } else null.asInstanceOf[U] + }.toIndexedSeq // @todo check the toIndexedSeq is free + Iterator((newValues, bs)) } new VertexSetRDD[U](index, newValues) } // end of mapValuesWithKeys @@ -314,8 +292,10 @@ class VertexSetRDD[@specialized V: ClassManifest]( val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) val newBS: BitSet = thisBS & otherBS - val newValues = thisValues.view.zip(otherValues) - Iterator((newValues.toIndexedSeq, newBS)) + val newValues: IndexedSeq[(V,W)] = + thisValues.view.zip(otherValues).toIndexedSeq // @todo check the toIndexedSeq is free + // Iterator((newValues.toIndexedSeq, newBS)) + Iterator((newValues, newBS)) } new VertexSetRDD(index, newValuesRDD) } @@ -348,10 +328,15 @@ class VertexSetRDD[@specialized V: ClassManifest]( assert(!thisIter.hasNext) val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) - val otherOption = otherValues.view.zipWithIndex - .map{ (x: (W, Int)) => if(otherBS.get(x._2)) Option(x._1) else None } - val newValues = thisValues.view.zip(otherOption) - Iterator((newValues.toIndexedSeq, thisBS)) + val newValues: IndexedSeq[(V, Option[W])] = thisValues.view.zip(otherValues) + .zipWithIndex.map { + // @todo not sure about the efficiency of this case statement + // though it is assumed that the return value is short lived + case ((v, w), ind) => (v, if (otherBS.get(ind)) Option(w) else None) + } + .toIndexedSeq // @todo check the toIndexedSeq is free + Iterator((newValues, thisBS)) + // Iterator((newValues.toIndexedSeq, thisBS)) } new VertexSetRDD(index, newValuesRDD) } // end of leftZipJoin @@ -378,7 +363,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( def leftJoin[W: ClassManifest]( other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a): VertexSetRDD[(V, Option[W]) ] = { - val cleanMerge = index.rdd.context.clean(merge) // Test if the other vertex is a VertexSetRDD to choose the optimal // join strategy other match { @@ -396,7 +380,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( if (other.partitioner == partitioner) other else other.partitionBy(partitioner.get) // Compute the new values RDD - val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = + val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = index.rdd.zipPartitions(valuesRDD, otherShuffled) { (thisIndexIter: Iterator[VertexIdToIndexMap], thisIter: Iterator[(IndexedSeq[V], BitSet)], @@ -407,33 +391,37 @@ class VertexSetRDD[@specialized V: ClassManifest]( val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) // Create a new array to store the values in the resulting VertexSet - val newW = new Array[W](thisValues.size) + val otherValues = new Array[W](thisValues.size) // track which values are matched with values in other - val wBS = new BitSet(thisValues.size) - // Loop over all the tuples that have vertices in this VertexSet. - for( (k, w) <- tuplesIter if index.contains(k) ) { - val ind = index.get(k) - // Not all the vertex ids in the index are in this VertexSet. - // If there is a vertex in this set then record the other value - if(thisBS.get(ind)) { - if(wBS.get(ind)) { - newW(ind) = cleanMerge(newW(ind), w) - } else { - newW(ind) = w - wBS.set(ind) + val otherBS = new BitSet(thisValues.size) + for ((k,w) <- tuplesIter) { + // Get the location of the key in the index + val pos = index.getPos(k) + // Only if the key is already in the index + if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) { + // Get the actual index + val ind = pos & OpenHashSet.POSITION_MASK + // If this value has already been seen then merge + if (otherBS.get(ind)) { + otherValues(ind) = merge(otherValues(ind), w) + } else { // otherwise just store the new value + otherBS.set(ind) + otherValues(ind) = w } } - } // end of for loop over tuples + } // Some vertices in this vertex set may not have a corresponding // tuple in the join and so a None value should be returned. - val otherOption = newW.view.zipWithIndex - .map{ (x: (W, Int)) => if(wBS.get(x._2)) Option(x._1) else None } - // the final values is the zip of the values in this RDD along with - // the values in the other - val newValues = thisValues.view.zip(otherOption) - Iterator((newValues.toIndexedSeq, thisBS)) + val newValues: IndexedSeq[(V, Option[W])] = thisValues.view.zip(otherValues) + .zipWithIndex.map { + // @todo not sure about the efficiency of this case statement + // though it is assumed that the return value is short lived + case ((v, w), ind) => (v, if (otherBS.get(ind)) Option(w) else None) + } + .toIndexedSeq // @todo check the toIndexedSeq is free + Iterator((newValues, thisBS)) } // end of newValues - new VertexSetRDD(index, newValues) + new VertexSetRDD(index, newValuesRDD) } } } // end of leftJoin @@ -443,6 +431,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( * For each key k in `this` or `other`, return a resulting RDD that contains a * tuple with the list of values for that key in `this` as well as `other`. */ + /* def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): VertexSetRDD[(Seq[V], Seq[W])] = { //RDD[(K, (Seq[V], Seq[W]))] = { @@ -489,16 +478,17 @@ class VertexSetRDD[@specialized V: ClassManifest]( assert(!thisIter.hasNext) val otherIndex = otherIter.next() assert(!otherIter.hasNext) - val newIndex = new VertexIdToIndexMap() - // @todo Merge only the keys that correspond to non-null values // Merge the keys - newIndex.putAll(thisIndex) - newIndex.putAll(otherIndex) - // We need to rekey the index - var ctr = 0 - for (e <- newIndex.entrySet) { - e.setValue(ctr) - ctr += 1 + val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity) + var ind = thisIndex.nextPos(0) + while(ind >= 0) { + newIndex.fastAdd(thisIndex.getValue(ind)) + ind = thisIndex.nextPos(ind+1) + } + var ind = otherIndex.nextPos(0) + while(ind >= 0) { + newIndex.fastAdd(otherIndex.getValue(ind)) + ind = otherIndex.nextPos(ind+1) } List(newIndex).iterator }).cache() @@ -604,7 +594,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( } } } // end of cogroup - + */ } // End of VertexSetRDD @@ -649,21 +639,14 @@ object VertexSetRDD { } val groups = preAgg.mapPartitions( iter => { - val indexMap = new VertexIdToIndexMap() - val values = new ArrayBuffer[V] + val hashMap = new PrimitiveKeyOpenHashMap[Vid, V] for ((k,v) <- iter) { - if(!indexMap.contains(k)) { - val ind = indexMap.size - indexMap.put(k, ind) - values.append(v) - } else { - val ind = indexMap.get(k) - values(ind) = reduceFunc(values(ind), v) - } + hashMap.update(k, v, reduceFunc) } - val bs = new BitSet(indexMap.size) - bs.setUntil(indexMap.size) - Iterator( (indexMap, (values.toIndexedSeq, bs)) ) + val index = hashMap.keySet + val values: IndexedSeq[V] = hashMap._values + val bs = index.getBitSet + Iterator( (index, (values, bs)) ) }, true).cache // extract the index and the values val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) @@ -747,20 +730,24 @@ object VertexSetRDD { // There is only one map val index = indexIter.next() assert(!indexIter.hasNext()) - val values = new Array[C](index.size) - val bs = new BitSet(index.size) + val values = new Array[C](index.capacity) + val bs = new BitSet(index.capacity) for ((k,c) <- tblIter) { - // @todo this extra check may be costing us a lot! - if (!index.contains(k)) { + // Get the location of the key in the index + val pos = index.getPos(k) + if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { throw new SparkException("Error: Trying to bind an external index " + "to an RDD which contains keys that are not in the index.") - } - val ind = index(k) - if (bs.get(ind)) { - values(ind) = mergeCombiners(values(ind), c) } else { - values(ind) = c - bs.set(ind) + // Get the actual index + val ind = pos & OpenHashSet.POSITION_MASK + // If this value has already been seen then merge + if (bs.get(ind)) { + values(ind) = mergeCombiners(values(ind), c) + } else { // otherwise just store the new value + bs.set(ind) + values(ind) = c + } } } Iterator((values, bs)) @@ -792,14 +779,9 @@ object VertexSetRDD { } val index = shuffledTbl.mapPartitions( iter => { - val indexMap = new VertexIdToIndexMap() - for ( (k,_) <- iter ){ - if(!indexMap.contains(k)){ - val ind = indexMap.size - indexMap.put(k, ind) - } - } - Iterator(indexMap) + val index = new VertexIdToIndexMap + for ( (k,_) <- iter ){ index.add(k) } + Iterator(index) }, true).cache new VertexSetIndex(index) } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 016811db36..b80713dbf4 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -5,7 +5,6 @@ import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuilder -import scala.collection.mutable.BitSet import org.apache.spark.SparkContext._ @@ -21,6 +20,12 @@ import org.apache.spark.graph._ import org.apache.spark.graph.impl.GraphImpl._ import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._ +import org.apache.spark.util.hash.BitSet +import org.apache.spark.util.hash.OpenHashSet +import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap + + + /** * The Iterator type returned when constructing edge triplets */ @@ -31,15 +36,16 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( private var pos = 0 private val et = new EdgeTriplet[VD, ED] + private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray) override def hasNext: Boolean = pos < edgePartition.size override def next() = { et.srcId = edgePartition.srcIds(pos) // assert(vmap.containsKey(e.src.id)) - et.srcAttr = vertexArray(vidToIndex(et.srcId)) + et.srcAttr = vmap(et.srcId) et.dstId = edgePartition.dstIds(pos) // assert(vmap.containsKey(e.dst.id)) - et.dstAttr = vertexArray(vidToIndex(et.dstId)) + et.dstAttr = vmap(et.dstId) et.attr = edgePartition.data(pos) pos += 1 et @@ -51,10 +57,10 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( for (i <- (0 until edgePartition.size)) { currentEdge.srcId = edgePartition.srcIds(i) // assert(vmap.containsKey(e.src.id)) - currentEdge.srcAttr = vertexArray(vidToIndex(currentEdge.srcId)) + currentEdge.srcAttr = vmap(currentEdge.srcId) currentEdge.dstId = edgePartition.dstIds(i) // assert(vmap.containsKey(e.dst.id)) - currentEdge.dstAttr = vertexArray(vidToIndex(currentEdge.dstId)) + currentEdge.dstAttr = vmap(currentEdge.dstId) currentEdge.attr = edgePartition.data(i) lb += currentEdge } @@ -63,23 +69,6 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( } // end of Edge Triplet Iterator - -object EdgeTripletBuilder { - def makeTriplets[VD: ClassManifest, ED: ClassManifest]( - localVidMap: RDD[(Pid, VertexIdToIndexMap)], - vTableReplicatedValues: RDD[(Pid, Array[VD]) ], - eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = { - localVidMap.zipPartitions(vTableReplicatedValues, eTable) { - (vidMapIter, replicatedValuesIter, eTableIter) => - val (_, vidToIndex) = vidMapIter.next() - val (_, vertexArray) = replicatedValuesIter.next() - val (_, edgePartition) = eTableIter.next() - new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition) - } - } -} - - /** * A Graph RDD that supports computation on graphs. */ @@ -90,6 +79,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val eTable: RDD[(Pid, EdgePartition[ED])] ) extends Graph[VD, ED] { + def this() = this(null, null, null, null) + + + /** * (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the * vertex data after it is replicated. Within each partition, it holds a map @@ -115,7 +108,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( /** Return a RDD that brings edges with its source and destination vertices together. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = - EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable) + makeTriplets(localVidMap, vTableReplicatedValues, eTable) override def cache(): Graph[VD, ED] = { @@ -219,24 +212,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } - override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): - Graph[VD, ED2] = { - val newETable = eTable.zipPartitions(localVidMap, vTableReplicatedValues){ - (edgePartitionIter, vidToIndexIter, vertexArrayIter) => - val (pid, edgePartition) = edgePartitionIter.next() - val (_, vidToIndex) = vidToIndexIter.next() - val (_, vertexArray) = vertexArrayIter.next() - val et = new EdgeTriplet[VD, ED] - val newEdgePartition = edgePartition.map{e => - et.set(e) - et.srcAttr = vertexArray(vidToIndex(e.srcId)) - et.dstAttr = vertexArray(vidToIndex(e.dstId)) - f(et) - } - Iterator((pid, newEdgePartition)) - } - new GraphImpl(vTable, vid2pid, localVidMap, newETable) - } + override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = + GraphImpl.mapTriplets(this, f) override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), @@ -330,57 +307,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def mapReduceTriplets[A: ClassManifest]( mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], reduceFunc: (A, A) => A) - : VertexSetRDD[A] = { - - ClosureCleaner.clean(mapFunc) - ClosureCleaner.clean(reduceFunc) - - // Map and preaggregate - val preAgg = eTable.zipPartitions(localVidMap, vTableReplicatedValues){ - (edgePartitionIter, vidToIndexIter, vertexArrayIter) => - val (pid, edgePartition) = edgePartitionIter.next() - val (_, vidToIndex) = vidToIndexIter.next() - val (_, vertexArray) = vertexArrayIter.next() - // We can reuse the vidToIndex map for aggregation here as well. - /** @todo Since this has the downside of not allowing "messages" to arbitrary - * vertices we should consider just using a fresh map. - */ - val msgArray = new Array[A](vertexArray.size) - val msgBS = new BitSet(vertexArray.size) - // Iterate over the partition - val et = new EdgeTriplet[VD, ED] - edgePartition.foreach{e => - et.set(e) - et.srcAttr = vertexArray(vidToIndex(e.srcId)) - et.dstAttr = vertexArray(vidToIndex(e.dstId)) - mapFunc(et).foreach{ case (vid, msg) => - // verify that the vid is valid - assert(vid == et.srcId || vid == et.dstId) - val ind = vidToIndex(vid) - // Populate the aggregator map - if(msgBS(ind)) { - msgArray(ind) = reduceFunc(msgArray(ind), msg) - } else { - msgArray(ind) = msg - msgBS(ind) = true - } - } - } - // Return the aggregate map - vidToIndex.long2IntEntrySet().fastIterator() - // Remove the entries that did not receive a message - .filter{ entry => msgBS(entry.getValue()) } - // Construct the actual pairs - .map{ entry => - val vid = entry.getLongKey() - val ind = entry.getValue() - val msg = msgArray(ind) - (vid, msg) - } - }.partitionBy(vTable.index.rdd.partitioner.get) - // do the final reduction reusing the index map - VertexSetRDD(preAgg, vTable.index, reduceFunc) - } + : VertexSetRDD[A] = + GraphImpl.mapReduceTriplets(this, mapFunc, reduceFunc) override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest] @@ -436,7 +364,6 @@ object GraphImpl { } - /** * Create the edge table RDD, which is much more efficient for Java heap storage than the * normal edges data structure (RDD[(Vid, Vid, ED)]). @@ -494,16 +421,9 @@ object GraphImpl { RDD[(Pid, VertexIdToIndexMap)] = { eTable.mapPartitions( _.map{ case (pid, epart) => val vidToIndex = new VertexIdToIndexMap - var i = 0 epart.foreach{ e => - if(!vidToIndex.contains(e.srcId)) { - vidToIndex.put(e.srcId, i) - i += 1 - } - if(!vidToIndex.contains(e.dstId)) { - vidToIndex.put(e.dstId, i) - i += 1 - } + vidToIndex.add(e.srcId) + vidToIndex.add(e.dstId) } (pid, vidToIndex) }, preservesPartitioning = true).cache() @@ -528,9 +448,9 @@ object GraphImpl { val (pid, vidToIndex) = mapIter.next() assert(!mapIter.hasNext) // Populate the vertex array using the vidToIndex map - val vertexArray = new Array[VD](vidToIndex.size) + val vertexArray = new Array[VD](vidToIndex.capacity) for (msg <- msgsIter) { - val ind = vidToIndex(msg.data._1) + val ind = vidToIndex.getPos(msg.data._1) & OpenHashSet.POSITION_MASK vertexArray(ind) = msg.data._2 } Iterator((pid, vertexArray)) @@ -540,6 +460,95 @@ object GraphImpl { } + def makeTriplets[VD: ClassManifest, ED: ClassManifest]( + localVidMap: RDD[(Pid, VertexIdToIndexMap)], + vTableReplicatedValues: RDD[(Pid, Array[VD]) ], + eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = { + localVidMap.zipPartitions(vTableReplicatedValues, eTable) { + (vidMapIter, replicatedValuesIter, eTableIter) => + val (_, vidToIndex) = vidMapIter.next() + val (_, vertexArray) = replicatedValuesIter.next() + val (_, edgePartition) = eTableIter.next() + new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition) + } + } + + + def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest]( + g: GraphImpl[VD, ED], + f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { + val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){ + (edgePartitionIter, vidToIndexIter, vertexArrayIter) => + val (pid, edgePartition) = edgePartitionIter.next() + val (_, vidToIndex) = vidToIndexIter.next() + val (_, vertexArray) = vertexArrayIter.next() + val et = new EdgeTriplet[VD, ED] + val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray) + val newEdgePartition = edgePartition.map{e => + et.set(e) + et.srcAttr = vmap(e.srcId) + et.dstAttr = vmap(e.dstId) + f(et) + } + Iterator((pid, newEdgePartition)) + } + new GraphImpl(g.vTable, g.vid2pid, g.localVidMap, newETable) + } + + + def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( + g: GraphImpl[VD, ED], + mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], + reduceFunc: (A, A) => A): VertexSetRDD[A] = { + + ClosureCleaner.clean(mapFunc) + ClosureCleaner.clean(reduceFunc) + + // Map and preaggregate + val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){ + (edgePartitionIter, vidToIndexIter, vertexArrayIter) => + val (pid, edgePartition) = edgePartitionIter.next() + val (_, vidToIndex) = vidToIndexIter.next() + val (_, vertexArray) = vertexArrayIter.next() + assert(!edgePartitionIter.hasNext) + assert(!vidToIndexIter.hasNext) + assert(!vertexArrayIter.hasNext) + assert(vidToIndex.capacity == vertexArray.size) + val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray) + // We can reuse the vidToIndex map for aggregation here as well. + /** @todo Since this has the downside of not allowing "messages" to arbitrary + * vertices we should consider just using a fresh map. + */ + val msgArray = new Array[A](vertexArray.size) + val msgBS = new BitSet(vertexArray.size) + // Iterate over the partition + val et = new EdgeTriplet[VD, ED] + edgePartition.foreach{e => + et.set(e) + et.srcAttr = vmap(e.srcId) + et.dstAttr = vmap(e.dstId) + mapFunc(et).foreach{ case (vid, msg) => + // verify that the vid is valid + assert(vid == et.srcId || vid == et.dstId) + // Get the index of the key + val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK + // Populate the aggregator map + if(msgBS.get(ind)) { + msgArray(ind) = reduceFunc(msgArray(ind), msg) + } else { + msgArray(ind) = msg + msgBS.set(ind) + } + } + } + // construct an iterator of tuples Iterator[(Vid, A)] + msgBS.iterator.map( ind => (vidToIndex.getValue(ind), msgArray(ind)) ) + }.partitionBy(g.vTable.index.rdd.partitioner.get) + // do the final reduction reusing the index map + VertexSetRDD(preAgg, g.vTable.index, reduceFunc) + } + + protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = { val mixingPrime: Vid = 1125899906842597L (math.abs(src) * mixingPrime).toInt % numParts diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala index 4627c3566c..37a4fb4a5e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/package.scala +++ b/graph/src/main/scala/org/apache/spark/graph/package.scala @@ -1,5 +1,10 @@ package org.apache.spark +import org.apache.spark.util.hash.BitSet +import org.apache.spark.util.hash.OpenHashSet +import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap + + package object graph { type Vid = Long @@ -8,8 +13,9 @@ package object graph { type VertexHashMap[T] = it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap[T] type VertexSet = it.unimi.dsi.fastutil.longs.LongOpenHashSet type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList - // @todo replace with rxin's fast hashmap - type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap + + // type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap + type VertexIdToIndexMap = OpenHashSet[Vid] /** * Return the default null-like value for a data type T. diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index f2b3d5bdfe..2067b1613e 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -77,16 +77,15 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark(new SparkContext("local", "test")) { sc => val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5) val b = VertexSetRDD(a).mapValues(x => -x) - assert(b.leftJoin(a) - .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0) + assert(b.count === 101) + assert(b.leftJoin(a).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0) val c = VertexSetRDD(a, b.index) - assert(b.leftJoin(c) - .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0) + assert(b.leftJoin(c).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0) val d = c.filter(q => ((q._2 % 2) == 0)) val e = a.filter(q => ((q._2 % 2) == 0)) assert(d.count === e.count) - assert(b.zipJoin(c).mapValues(x => x._1 + x._2) - .map(x => x._2).reduce(_+_) === 0) + assert(b.zipJoin(c).mapValues(x => x._1 + x._2).map(x => x._2).reduce(_+_) === 0) + } } -- cgit v1.2.3 From 63311d9c729d40c1846aeeef808e3b6b5c4479ec Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 31 Oct 2013 20:12:30 -0700 Subject: renamed update to setMerge --- core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala | 2 +- .../main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala | 2 +- graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala index af282d5651..4eb52158b0 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala @@ -79,7 +79,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: } /** Set the value for a key */ - def update(k: K, v: V, mergeF: (V,V) => V) { + def setMerge(k: K, v: V, mergeF: (V,V) => V) { if (k == null) { if(haveNullValue) { nullValue = mergeF(nullValue, v) diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala index cbfb2361b4..1bf2554fb7 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala @@ -65,7 +65,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, /** Set the value for a key */ - def update(k: K, v: V, mergeF: (V,V) => V) { + def setMerge(k: K, v: V, mergeF: (V,V) => V) { val pos = keySet.fastAdd(k) val ind = pos & OpenHashSet.POSITION_MASK if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { // if first add diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index b3647c083e..7211ff3705 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -641,7 +641,7 @@ object VertexSetRDD { val groups = preAgg.mapPartitions( iter => { val hashMap = new PrimitiveKeyOpenHashMap[Vid, V] for ((k,v) <- iter) { - hashMap.update(k, v, reduceFunc) + hashMap.setMerge(k, v, reduceFunc) } val index = hashMap.keySet val values: IndexedSeq[V] = hashMap._values -- cgit v1.2.3 From e7d37472b83b3bc8e232e790b2df230e35c0a5af Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 31 Oct 2013 21:09:39 -0700 Subject: After some testing I realized that the IndexedSeq is still instantiating the array (not maintaining a view) so I have replaced all IndexedSeq[V] with (Int => V) --- .../org/apache/spark/graph/VertexSetRDD.scala | 83 +++++++++------------- 1 file changed, 32 insertions(+), 51 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 7211ff3705..f26e286003 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -62,7 +62,6 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { } // end of VertexSetIndex - /** * An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there * is only one entry for each vertex and by pre-indexing the entries @@ -99,7 +98,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { */ class VertexSetRDD[@specialized V: ClassManifest]( @transient val index: VertexSetIndex, - @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ]) + @transient val valuesRDD: RDD[ ( (Int => V), BitSet) ]) extends RDD[(Vid, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { @@ -183,13 +182,13 @@ class VertexSetRDD[@specialized V: ClassManifest]( val cleanPred = index.rdd.context.clean(pred) val newValues = index.rdd.zipPartitions(valuesRDD){ (keysIter: Iterator[VertexIdToIndexMap], - valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => + valuesIter: Iterator[(Int => V, BitSet)]) => val index = keysIter.next() assert(keysIter.hasNext() == false) val (oldValues, bs) = valuesIter.next() assert(valuesIter.hasNext() == false) // Allocate the array to store the results into - val newBS = new BitSet(oldValues.size) + val newBS = new BitSet(index.capacity) // Iterate over the active bits in the old bitset and // evaluate the predicate var ind = bs.nextSetBit(0) @@ -218,15 +217,13 @@ class VertexSetRDD[@specialized V: ClassManifest]( * VertexSetRDD retains the same index. */ def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = { - val cleanF = index.rdd.context.clean(f) - val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = + val newValuesRDD: RDD[ (Int => U, BitSet) ] = valuesRDD.mapPartitions(iter => iter.map{ case (values, bs: BitSet) => - val newValues: IndexedSeq[U] = values.view.zipWithIndex.map{ - (x: (V, Int)) => if(bs.get(x._2)) cleanF(x._1) else null.asInstanceOf[U] - }.toIndexedSeq // @todo check the toIndexedSeq is free + val newValues: (Int => U) = + (ind: Int) => if (bs.get(ind)) f(values(ind)) else null.asInstanceOf[U] (newValues, bs) - }, preservesPartitioning = true) + }, preservesPartitioning = true) new VertexSetRDD[U](index, newValuesRDD) } // end of mapValues @@ -244,22 +241,19 @@ class VertexSetRDD[@specialized V: ClassManifest]( * VertexSetRDD retains the same index. */ def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = { - val cleanF = index.rdd.context.clean(f) - val newValues: RDD[ (IndexedSeq[U], BitSet) ] = + val newValues: RDD[ (Int => U, BitSet) ] = index.rdd.zipPartitions(valuesRDD){ (keysIter: Iterator[VertexIdToIndexMap], - valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => + valuesIter: Iterator[(Int => V, BitSet)]) => val index = keysIter.next() assert(keysIter.hasNext() == false) val (oldValues, bs: BitSet) = valuesIter.next() assert(valuesIter.hasNext() == false) // Cosntruct a view of the map transformation - val newValues: IndexedSeq[U] = oldValues.view.zipWithIndex.map{ - (x: (V, Int)) => - if(bs.get(x._2)) { - cleanF(index.getValueSafe(x._2), x._1) - } else null.asInstanceOf[U] - }.toIndexedSeq // @todo check the toIndexedSeq is free + val newValues: (Int => U) = (ind: Int) => { + if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) } + else { null.asInstanceOf[U] } + } Iterator((newValues, bs)) } new VertexSetRDD[U](index, newValues) @@ -283,18 +277,16 @@ class VertexSetRDD[@specialized V: ClassManifest]( if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } - val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = + val newValuesRDD: RDD[ (Int => (V,W), BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter: Iterator[(IndexedSeq[V], BitSet)], - otherIter: Iterator[(IndexedSeq[W], BitSet)]) => + (thisIter: Iterator[(Int => V, BitSet)], + otherIter: Iterator[(Int => W, BitSet)]) => val (thisValues, thisBS: BitSet) = thisIter.next() assert(!thisIter.hasNext) val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) val newBS: BitSet = thisBS & otherBS - val newValues: IndexedSeq[(V,W)] = - thisValues.view.zip(otherValues).toIndexedSeq // @todo check the toIndexedSeq is free - // Iterator((newValues.toIndexedSeq, newBS)) + val newValues: Int => (V,W) = (ind: Int) => (thisValues(ind), otherValues(ind)) Iterator((newValues, newBS)) } new VertexSetRDD(index, newValuesRDD) @@ -320,23 +312,17 @@ class VertexSetRDD[@specialized V: ClassManifest]( if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } - val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = + val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter: Iterator[(IndexedSeq[V], BitSet)], - otherIter: Iterator[(IndexedSeq[W], BitSet)]) => + (thisIter: Iterator[(Int => V, BitSet)], + otherIter: Iterator[(Int => W, BitSet)]) => val (thisValues, thisBS: BitSet) = thisIter.next() assert(!thisIter.hasNext) val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) - val newValues: IndexedSeq[(V, Option[W])] = thisValues.view.zip(otherValues) - .zipWithIndex.map { - // @todo not sure about the efficiency of this case statement - // though it is assumed that the return value is short lived - case ((v, w), ind) => (v, if (otherBS.get(ind)) Option(w) else None) - } - .toIndexedSeq // @todo check the toIndexedSeq is free + val newValues: Int => (V, Option[W]) = (ind: Int) => + (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None) Iterator((newValues, thisBS)) - // Iterator((newValues.toIndexedSeq, thisBS)) } new VertexSetRDD(index, newValuesRDD) } // end of leftZipJoin @@ -380,10 +366,10 @@ class VertexSetRDD[@specialized V: ClassManifest]( if (other.partitioner == partitioner) other else other.partitionBy(partitioner.get) // Compute the new values RDD - val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = + val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = index.rdd.zipPartitions(valuesRDD, otherShuffled) { (thisIndexIter: Iterator[VertexIdToIndexMap], - thisIter: Iterator[(IndexedSeq[V], BitSet)], + thisIter: Iterator[(Int => V, BitSet)], tuplesIter: Iterator[(Vid,W)]) => // Get the Index and values for this RDD val index = thisIndexIter.next() @@ -391,9 +377,9 @@ class VertexSetRDD[@specialized V: ClassManifest]( val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) // Create a new array to store the values in the resulting VertexSet - val otherValues = new Array[W](thisValues.size) + val otherValues = new Array[W](index.capacity) // track which values are matched with values in other - val otherBS = new BitSet(thisValues.size) + val otherBS = new BitSet(index.capacity) for ((k,w) <- tuplesIter) { // Get the location of the key in the index val pos = index.getPos(k) @@ -412,13 +398,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( } // Some vertices in this vertex set may not have a corresponding // tuple in the join and so a None value should be returned. - val newValues: IndexedSeq[(V, Option[W])] = thisValues.view.zip(otherValues) - .zipWithIndex.map { - // @todo not sure about the efficiency of this case statement - // though it is assumed that the return value is short lived - case ((v, w), ind) => (v, if (otherBS.get(ind)) Option(w) else None) - } - .toIndexedSeq // @todo check the toIndexedSeq is free + val newValues: Int => (V, Option[W]) = (ind: Int) => + (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None) Iterator((newValues, thisBS)) } // end of newValues new VertexSetRDD(index, newValuesRDD) @@ -644,13 +625,13 @@ object VertexSetRDD { hashMap.setMerge(k, v, reduceFunc) } val index = hashMap.keySet - val values: IndexedSeq[V] = hashMap._values + val values: Int => V = (ind: Int) => hashMap._values(ind) val bs = index.getBitSet Iterator( (index, (values, bs)) ) }, true).cache // extract the index and the values val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) - val values: RDD[(IndexedSeq[V], BitSet)] = + val values: RDD[(Int => V, BitSet)] = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) new VertexSetRDD[V](new VertexSetIndex(index), values) } // end of apply @@ -726,7 +707,7 @@ object VertexSetRDD { } // Use the index to build the new values table - val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { + val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { // There is only one map val index = indexIter.next() assert(!indexIter.hasNext()) @@ -750,7 +731,7 @@ object VertexSetRDD { } } } - Iterator((values, bs)) + Iterator(((ind: Int) => values(ind), bs)) }) new VertexSetRDD(index, values) } // end of apply -- cgit v1.2.3 From db89ac4bc8cbabaa13b36b75f4be2d96c29cb83a Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 31 Oct 2013 21:19:26 -0700 Subject: Changing var to val for keySet in OpenHashMaps --- core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala | 2 +- .../main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala index 4eb52158b0..e53551ced6 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala @@ -27,7 +27,7 @@ package org.apache.spark.util.hash */ private[spark] class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest]( - var keySet: OpenHashSet[K], var _values: Array[V]) + val keySet: OpenHashSet[K], var _values: Array[V]) extends Iterable[(K, V)] with Serializable { diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala index 1bf2554fb7..08fc74e5da 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala @@ -28,7 +28,7 @@ package org.apache.spark.util.hash private[spark] class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, @specialized(Long, Int, Double) V: ClassManifest]( - var keySet: OpenHashSet[K], var _values: Array[V]) + val keySet: OpenHashSet[K], var _values: Array[V]) extends Iterable[(K, V)] with Serializable { -- cgit v1.2.3