aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-31 20:02:05 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-31 20:02:05 -0800
commit83dfa1666487a4772c95fea21fde0d47471e063d (patch)
tree52658027926547bb5bc0504f8903120289f42a95 /core/src
parent8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a (diff)
downloadspark-83dfa1666487a4772c95fea21fde0d47471e063d.tar.gz
spark-83dfa1666487a4772c95fea21fde0d47471e063d.tar.bz2
spark-83dfa1666487a4772c95fea21fde0d47471e063d.zip
Address Patrick's and Reynold's comments
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala120
1 files changed, 71 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 492b4fc7c6..311405f0cf 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,6 +20,8 @@ package org.apache.spark.util.collection
import java.io._
import java.util.Comparator
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
import scala.reflect.ClassTag
@@ -53,7 +55,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
private val map: SpillableAppendOnlyMap[K, V, _, C] = {
if (mergeBeforeSpill) {
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, mergeCombiners,
- Predef.identity, serializer, diskBlockManager)
+ identity, serializer, diskBlockManager)
} else {
// Use ArrayBuffer[V] as the intermediate combiner
val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
@@ -111,9 +113,10 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
import SpillableAppendOnlyMap._
private var currentMap = new SizeTrackingAppendOnlyMap[K, G]
- private val oldMaps = new ArrayBuffer[DiskKGIterator]
+ private val spilledMaps = new ArrayBuffer[DiskIterator]
private val memoryThresholdMB = {
+ // TODO: Turn this into a fraction of memory per reducer
val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat
bufferSize * bufferPercent
@@ -152,31 +155,37 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
writer.close()
}
currentMap = new SizeTrackingAppendOnlyMap[K, G]
- oldMaps.append(new DiskKGIterator(file))
+ spilledMaps.append(new DiskIterator(file))
}
override def iterator: Iterator[(K, C)] = {
- if (oldMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) {
+ if (spilledMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) {
currentMap.iterator.asInstanceOf[Iterator[(K, C)]]
} else {
new ExternalIterator()
}
}
- // An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs
+ /** An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs. */
private class ExternalIterator extends Iterator[(K, C)] {
- val mergeHeap = new PriorityQueue[KGITuple]
- val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(comparator))
- // Invariant: size of mergeHeap == number of input streams
+ // A fixed-size queue that maintains a buffer for each stream we are currently merging
+ val mergeHeap = new PriorityQueue[StreamBuffer]
+
+ // Input streams are derived both from the in-memory map and spilled maps on disk
+ // The in-memory map is sorted in place, while the spilled maps are already in sorted order
+ val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps
+
inputStreams.foreach{ it =>
- val kgPairs = readFromIterator(it)
- mergeHeap.enqueue(KGITuple(it, kgPairs))
+ val kgPairs = getMorePairs(it)
+ mergeHeap.enqueue(StreamBuffer(it, kgPairs))
}
- // Read from the given iterator until a key of different hash is retrieved.
- // The resulting ArrayBuffer includes this key, and is ordered by key hash.
- def readFromIterator(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = {
+ /**
+ * Fetch from the given iterator until a key of different hash is retrieved. In the
+ * event of key hash collisions, this ensures no pairs are hidden from being merged.
+ */
+ def getMorePairs(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = {
val kgPairs = new ArrayBuffer[(K, G)]
if (it.hasNext) {
var kg = it.next()
@@ -190,20 +199,26 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
kgPairs
}
- // From the given KGITuple, remove the (K, G) pair with K = key and merge it into baseGroup
- def mergeIntoGroup(key: K, baseGroup: G, kgi: KGITuple): G = {
- kgi.pairs.zipWithIndex.foreach { case ((k, g), i) =>
+ /**
+ * If the given buffer contains a value for the given key, merge that value into
+ * baseGroup and remove the corresponding (K, G) pair from the buffer
+ */
+ def mergeIfKeyExists(key: K, baseGroup: G, buffer: StreamBuffer): G = {
+ var i = 0
+ while (i < buffer.pairs.size) {
+ val (k, g) = buffer.pairs(i)
if (k == key) {
- kgi.pairs.remove(i)
+ buffer.pairs.remove(i)
return mergeGroups(baseGroup, g)
}
+ i += 1
}
baseGroup
}
override def hasNext: Boolean = {
- mergeHeap.foreach{ kgi =>
- if (!kgi.pairs.isEmpty) {
+ mergeHeap.foreach{ buffer =>
+ if (!buffer.pairs.isEmpty) {
return true
}
}
@@ -211,66 +226,74 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
}
override def next(): (K, C) = {
- val minKGI = mergeHeap.dequeue()
- val (minPairs, minHash) = (minKGI.pairs, minKGI.minHash)
+ // Select a return key from the StreamBuffer that holds the lowest key hash
+ val minBuffer = mergeHeap.dequeue()
+ val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
if (minPairs.length == 0) {
- // Should only happen when hasNext is false
+ // Should only happen when no other stream buffers have any pairs left
throw new NoSuchElementException
}
-
- // Select a return key with the minimum hash
var (minKey, minGroup) = minPairs.remove(0)
assert(minKey.hashCode() == minHash)
- // Merge all other KGITuple's with the same minHash
- val dequeuedKGI = ArrayBuffer[KGITuple](minKGI)
- while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) {
- val newKGI = mergeHeap.dequeue()
- minGroup = mergeIntoGroup(minKey, minGroup, newKGI)
- dequeuedKGI += newKGI
+ // For all other streams that may have this key (i.e. have the same minimum key hash),
+ // merge in the corresponding value (if any) from that stream
+ val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
+ while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) {
+ val newBuffer = mergeHeap.dequeue()
+ minGroup = mergeIfKeyExists(minKey, minGroup, newBuffer)
+ mergedBuffers += newBuffer
}
- // Repopulate and add back all dequeued KGI to mergeHeap
- dequeuedKGI.foreach { kgi =>
- if (kgi.pairs.length == 0) {
- kgi.pairs ++= readFromIterator(kgi.iterator)
+ // Repopulate each visited stream buffer and add it back to the merge heap
+ mergedBuffers.foreach { buffer =>
+ if (buffer.pairs.length == 0) {
+ buffer.pairs ++= getMorePairs(buffer.iterator)
}
- mergeHeap.enqueue(kgi)
+ mergeHeap.enqueue(buffer)
}
(minKey, createCombiner(minGroup))
}
- case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
- extends Comparable[KGITuple] {
-
- // Invariant: pairs are ordered by key hash
- def minHash: Int = {
+ /**
+ * A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash.
+ * Each buffer maintains the lowest-ordered keys in the corresponding iterator. Due to
+ * hash collisions, it is possible for multiple keys to be "tied" for being the lowest.
+ *
+ * StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
+ */
+ case class StreamBuffer(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
+ extends Comparable[StreamBuffer] {
+
+ def minKeyHash: Int = {
if (pairs.length > 0){
+ // pairs are already sorted by key hash
pairs(0)._1.hashCode()
} else {
Int.MaxValue
}
}
- override def compareTo(other: KGITuple): Int = {
- // mutable.PriorityQueue dequeues the max, not the min
- -minHash.compareTo(other.minHash)
+ override def compareTo(other: StreamBuffer): Int = {
+ // minus sign because mutable.PriorityQueue dequeues the max, not the min
+ -minKeyHash.compareTo(other.minKeyHash)
}
}
}
// Iterate through (K, G) pairs in sorted order from an on-disk map
- private class DiskKGIterator(file: File) extends Iterator[(K, G)] {
- val fstream = new FileInputStream(file)
- val dstream = ser.deserializeStream(fstream)
+ private class DiskIterator(file: File) extends Iterator[(K, G)] {
+ val fileStream = new FileInputStream(file)
+ val bufferedStream = new FastBufferedInputStream(fileStream)
+ val deserializeStream = ser.deserializeStream(bufferedStream)
var nextItem: Option[(K, G)] = None
var eof = false
def readNextItem(): Option[(K, G)] = {
if (!eof) {
try {
- return Some(dstream.readObject().asInstanceOf[(K, G)])
+ return Some(deserializeStream.readObject().asInstanceOf[(K, G)])
} catch {
case e: EOFException =>
eof = true
@@ -302,8 +325,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
// TODO: Ensure this gets called even if the iterator isn't drained.
def cleanup() {
- fstream.close()
- dstream.close()
+ deserializeStream.close()
file.delete()
}
}