aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-30 13:01:00 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-30 13:01:00 -0800
commitd6e7910d925039d9b57d82e7ca17e775c52fbee5 (patch)
treef50da89bea0e24abefe30e10f576febcac07df94
parent2b71ab97c4787e7f82d026e37b41f3a5767b4e89 (diff)
downloadspark-d6e7910d925039d9b57d82e7ca17e775c52fbee5.tar.gz
spark-d6e7910d925039d9b57d82e7ca17e775c52fbee5.tar.bz2
spark-d6e7910d925039d9b57d82e7ca17e775c52fbee5.zip
Simplify merge logic based on the invariant that all spills contain unique keys
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala59
1 files changed, 22 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 4349e8d638..0e8f46cfc7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -185,29 +185,15 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
kgPairs
}
- // Drop and return all (K, G) pairs with K = the given key from the given KGITuple
- def dropKey(kgi: KGITuple, key: K): ArrayBuffer[(K, G)] = {
- val dropped = new ArrayBuffer[(K, G)]
- var i = 0
- while (i < kgi.pairs.length) {
- if (kgi.pairs(i)._1 == key) {
- dropped += kgi.pairs.remove(i)
- } else {
- i += 1
- }
- }
- dropped
- }
-
- // Merge all (K, G) pairs with K = the given key into baseGroup
- def mergeIntoGroup(key: K, baseGroup: G, kgPairs: ArrayBuffer[(K, G)]): G = {
- var mergedGroup = baseGroup
- kgPairs.foreach { case (k, g) =>
- if (k == key){
- mergedGroup = mergeGroups(mergedGroup, g)
+ // From the given KGITuple, remove the (K, G) pair with K = key and merge it into baseGroup
+ def mergeIntoGroup(key: K, baseGroup: G, kgi: KGITuple): G = {
+ kgi.pairs.zipWithIndex.foreach { case ((k, g), i) =>
+ if (k == key) {
+ kgi.pairs.remove(i)
+ return mergeGroups(baseGroup, g)
}
}
- mergedGroup
+ baseGroup
}
override def hasNext: Boolean = {
@@ -226,28 +212,27 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
// Should only happen when hasNext is false
throw new NoSuchElementException
}
- var (minKey, minGroup) = minPairs(0)
- assert(minKey.hashCode() == minHash)
- // Merge the rest of minPairs into minGroup
- val minPairsWithKey = dropKey(minKGI, minKey).tail
- minGroup = mergeIntoGroup(minKey, minGroup, minPairsWithKey)
- if (minPairs.length == 0) {
- minPairs ++= readFromIterator(minKGI.iterator)
- }
+ // Select a return key with the minimum hash
+ var (minKey, minGroup) = minPairs.remove(0)
+ assert(minKey.hashCode() == minHash)
- // Do the same for all other KGITuples with the same minHash
- val tuplesToAddBack = ArrayBuffer[KGITuple](minKGI)
+ // Merge all other KGITuple's with the same minHash
+ val dequeuedKGI = ArrayBuffer[KGITuple](minKGI)
while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) {
val newKGI = mergeHeap.dequeue()
- val pairsWithKey = dropKey(newKGI, minKey)
- minGroup = mergeIntoGroup(minKey, minGroup, pairsWithKey)
- if (newKGI.pairs.length == 0) {
- newKGI.pairs ++= readFromIterator(newKGI.iterator)
+ minGroup = mergeIntoGroup(minKey, minGroup, newKGI)
+ dequeuedKGI += newKGI
+ }
+
+ // Repopulate and add back all dequeued KGI to mergeHeap
+ dequeuedKGI.foreach { kgi =>
+ if (kgi.pairs.length == 0) {
+ kgi.pairs ++= readFromIterator(kgi.iterator)
}
- tuplesToAddBack += newKGI
+ mergeHeap.enqueue(kgi)
}
- tuplesToAddBack.foreach(mergeHeap.enqueue(_))
+
(minKey, createCombiner(minGroup))
}