diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-30 13:01:00 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-30 13:01:00 -0800 |
commit | d6e7910d925039d9b57d82e7ca17e775c52fbee5 (patch) | |
tree | f50da89bea0e24abefe30e10f576febcac07df94 | |
parent | 2b71ab97c4787e7f82d026e37b41f3a5767b4e89 (diff) | |
download | spark-d6e7910d925039d9b57d82e7ca17e775c52fbee5.tar.gz spark-d6e7910d925039d9b57d82e7ca17e775c52fbee5.tar.bz2 spark-d6e7910d925039d9b57d82e7ca17e775c52fbee5.zip |
Simplify merge logic based on the invariant that all spills contain unique keys
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 59 |
1 files changed, 22 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 4349e8d638..0e8f46cfc7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -185,29 +185,15 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( kgPairs } - // Drop and return all (K, G) pairs with K = the given key from the given KGITuple - def dropKey(kgi: KGITuple, key: K): ArrayBuffer[(K, G)] = { - val dropped = new ArrayBuffer[(K, G)] - var i = 0 - while (i < kgi.pairs.length) { - if (kgi.pairs(i)._1 == key) { - dropped += kgi.pairs.remove(i) - } else { - i += 1 - } - } - dropped - } - - // Merge all (K, G) pairs with K = the given key into baseGroup - def mergeIntoGroup(key: K, baseGroup: G, kgPairs: ArrayBuffer[(K, G)]): G = { - var mergedGroup = baseGroup - kgPairs.foreach { case (k, g) => - if (k == key){ - mergedGroup = mergeGroups(mergedGroup, g) + // From the given KGITuple, remove the (K, G) pair with K = key and merge it into baseGroup + def mergeIntoGroup(key: K, baseGroup: G, kgi: KGITuple): G = { + kgi.pairs.zipWithIndex.foreach { case ((k, g), i) => + if (k == key) { + kgi.pairs.remove(i) + return mergeGroups(baseGroup, g) } } - mergedGroup + baseGroup } override def hasNext: Boolean = { @@ -226,28 +212,27 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // Should only happen when hasNext is false throw new NoSuchElementException } - var (minKey, minGroup) = minPairs(0) - assert(minKey.hashCode() == minHash) - // Merge the rest of minPairs into minGroup - val minPairsWithKey = dropKey(minKGI, minKey).tail - minGroup = mergeIntoGroup(minKey, minGroup, minPairsWithKey) - if (minPairs.length == 0) { - minPairs ++= readFromIterator(minKGI.iterator) - } + // Select a return key with the minimum hash + var (minKey, minGroup) = minPairs.remove(0) + assert(minKey.hashCode() == minHash) - // Do the same for all other KGITuples with the same minHash - val tuplesToAddBack = ArrayBuffer[KGITuple](minKGI) + // Merge all other KGITuple's with the same minHash + val dequeuedKGI = ArrayBuffer[KGITuple](minKGI) while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) { val newKGI = mergeHeap.dequeue() - val pairsWithKey = dropKey(newKGI, minKey) - minGroup = mergeIntoGroup(minKey, minGroup, pairsWithKey) - if (newKGI.pairs.length == 0) { - newKGI.pairs ++= readFromIterator(newKGI.iterator) + minGroup = mergeIntoGroup(minKey, minGroup, newKGI) + dequeuedKGI += newKGI + } + + // Repopulate and add back all dequeued KGI to mergeHeap + dequeuedKGI.foreach { kgi => + if (kgi.pairs.length == 0) { + kgi.pairs ++= readFromIterator(kgi.iterator) } - tuplesToAddBack += newKGI + mergeHeap.enqueue(kgi) } - tuplesToAddBack.foreach(mergeHeap.enqueue(_)) + (minKey, createCombiner(minGroup)) } |