diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-31 20:02:05 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-31 20:02:05 -0800 |
commit | 83dfa1666487a4772c95fea21fde0d47471e063d (patch) | |
tree | 52658027926547bb5bc0504f8903120289f42a95 | |
parent | 8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a (diff) | |
download | spark-83dfa1666487a4772c95fea21fde0d47471e063d.tar.gz spark-83dfa1666487a4772c95fea21fde0d47471e063d.tar.bz2 spark-83dfa1666487a4772c95fea21fde0d47471e063d.zip |
Address Patrick's and Reynold's comments
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 120 |
1 files changed, 71 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 492b4fc7c6..311405f0cf 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -20,6 +20,8 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator +import it.unimi.dsi.fastutil.io.FastBufferedInputStream + import scala.collection.mutable.{ArrayBuffer, PriorityQueue} import scala.reflect.ClassTag @@ -53,7 +55,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag]( private val map: SpillableAppendOnlyMap[K, V, _, C] = { if (mergeBeforeSpill) { new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, mergeCombiners, - Predef.identity, serializer, diskBlockManager) + identity, serializer, diskBlockManager) } else { // Use ArrayBuffer[V] as the intermediate combiner val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value) @@ -111,9 +113,10 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( import SpillableAppendOnlyMap._ private var currentMap = new SizeTrackingAppendOnlyMap[K, G] - private val oldMaps = new ArrayBuffer[DiskKGIterator] + private val spilledMaps = new ArrayBuffer[DiskIterator] private val memoryThresholdMB = { + // TODO: Turn this into a fraction of memory per reducer val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat bufferSize * bufferPercent @@ -152,31 +155,37 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( writer.close() } currentMap = new SizeTrackingAppendOnlyMap[K, G] - oldMaps.append(new DiskKGIterator(file)) + spilledMaps.append(new DiskIterator(file)) } override def iterator: Iterator[(K, C)] = { - if (oldMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) { + if (spilledMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) { currentMap.iterator.asInstanceOf[Iterator[(K, C)]] } else { new ExternalIterator() } } - // An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs + /** An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs. */ private class ExternalIterator extends Iterator[(K, C)] { - val mergeHeap = new PriorityQueue[KGITuple] - val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(comparator)) - // Invariant: size of mergeHeap == number of input streams + // A fixed-size queue that maintains a buffer for each stream we are currently merging + val mergeHeap = new PriorityQueue[StreamBuffer] + + // Input streams are derived both from the in-memory map and spilled maps on disk + // The in-memory map is sorted in place, while the spilled maps are already in sorted order + val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps + inputStreams.foreach{ it => - val kgPairs = readFromIterator(it) - mergeHeap.enqueue(KGITuple(it, kgPairs)) + val kgPairs = getMorePairs(it) + mergeHeap.enqueue(StreamBuffer(it, kgPairs)) } - // Read from the given iterator until a key of different hash is retrieved. - // The resulting ArrayBuffer includes this key, and is ordered by key hash. - def readFromIterator(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = { + /** + * Fetch from the given iterator until a key of different hash is retrieved. In the + * event of key hash collisions, this ensures no pairs are hidden from being merged. + */ + def getMorePairs(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = { val kgPairs = new ArrayBuffer[(K, G)] if (it.hasNext) { var kg = it.next() @@ -190,20 +199,26 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( kgPairs } - // From the given KGITuple, remove the (K, G) pair with K = key and merge it into baseGroup - def mergeIntoGroup(key: K, baseGroup: G, kgi: KGITuple): G = { - kgi.pairs.zipWithIndex.foreach { case ((k, g), i) => + /** + * If the given buffer contains a value for the given key, merge that value into + * baseGroup and remove the corresponding (K, G) pair from the buffer + */ + def mergeIfKeyExists(key: K, baseGroup: G, buffer: StreamBuffer): G = { + var i = 0 + while (i < buffer.pairs.size) { + val (k, g) = buffer.pairs(i) if (k == key) { - kgi.pairs.remove(i) + buffer.pairs.remove(i) return mergeGroups(baseGroup, g) } + i += 1 } baseGroup } override def hasNext: Boolean = { - mergeHeap.foreach{ kgi => - if (!kgi.pairs.isEmpty) { + mergeHeap.foreach{ buffer => + if (!buffer.pairs.isEmpty) { return true } } @@ -211,66 +226,74 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } override def next(): (K, C) = { - val minKGI = mergeHeap.dequeue() - val (minPairs, minHash) = (minKGI.pairs, minKGI.minHash) + // Select a return key from the StreamBuffer that holds the lowest key hash + val minBuffer = mergeHeap.dequeue() + val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) if (minPairs.length == 0) { - // Should only happen when hasNext is false + // Should only happen when no other stream buffers have any pairs left throw new NoSuchElementException } - - // Select a return key with the minimum hash var (minKey, minGroup) = minPairs.remove(0) assert(minKey.hashCode() == minHash) - // Merge all other KGITuple's with the same minHash - val dequeuedKGI = ArrayBuffer[KGITuple](minKGI) - while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) { - val newKGI = mergeHeap.dequeue() - minGroup = mergeIntoGroup(minKey, minGroup, newKGI) - dequeuedKGI += newKGI + // For all other streams that may have this key (i.e. have the same minimum key hash), + // merge in the corresponding value (if any) from that stream + val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer) + while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) { + val newBuffer = mergeHeap.dequeue() + minGroup = mergeIfKeyExists(minKey, minGroup, newBuffer) + mergedBuffers += newBuffer } - // Repopulate and add back all dequeued KGI to mergeHeap - dequeuedKGI.foreach { kgi => - if (kgi.pairs.length == 0) { - kgi.pairs ++= readFromIterator(kgi.iterator) + // Repopulate each visited stream buffer and add it back to the merge heap + mergedBuffers.foreach { buffer => + if (buffer.pairs.length == 0) { + buffer.pairs ++= getMorePairs(buffer.iterator) } - mergeHeap.enqueue(kgi) + mergeHeap.enqueue(buffer) } (minKey, createCombiner(minGroup)) } - case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)]) - extends Comparable[KGITuple] { - - // Invariant: pairs are ordered by key hash - def minHash: Int = { + /** + * A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash. + * Each buffer maintains the lowest-ordered keys in the corresponding iterator. Due to + * hash collisions, it is possible for multiple keys to be "tied" for being the lowest. + * + * StreamBuffers are ordered by the minimum key hash found across all of their own pairs. + */ + case class StreamBuffer(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)]) + extends Comparable[StreamBuffer] { + + def minKeyHash: Int = { if (pairs.length > 0){ + // pairs are already sorted by key hash pairs(0)._1.hashCode() } else { Int.MaxValue } } - override def compareTo(other: KGITuple): Int = { - // mutable.PriorityQueue dequeues the max, not the min - -minHash.compareTo(other.minHash) + override def compareTo(other: StreamBuffer): Int = { + // minus sign because mutable.PriorityQueue dequeues the max, not the min + -minKeyHash.compareTo(other.minKeyHash) } } } // Iterate through (K, G) pairs in sorted order from an on-disk map - private class DiskKGIterator(file: File) extends Iterator[(K, G)] { - val fstream = new FileInputStream(file) - val dstream = ser.deserializeStream(fstream) + private class DiskIterator(file: File) extends Iterator[(K, G)] { + val fileStream = new FileInputStream(file) + val bufferedStream = new FastBufferedInputStream(fileStream) + val deserializeStream = ser.deserializeStream(bufferedStream) var nextItem: Option[(K, G)] = None var eof = false def readNextItem(): Option[(K, G)] = { if (!eof) { try { - return Some(dstream.readObject().asInstanceOf[(K, G)]) + return Some(deserializeStream.readObject().asInstanceOf[(K, G)]) } catch { case e: EOFException => eof = true @@ -302,8 +325,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // TODO: Ensure this gets called even if the iterator isn't drained. def cleanup() { - fstream.close() - dstream.close() + deserializeStream.close() file.delete() } } |