diff options
Diffstat (limited to 'core')
4 files changed, 51 insertions, 45 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 08a96b0c34..8b30cd4bfe 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -32,7 +32,7 @@ case class Aggregator[K, V, C] ( mergeCombiners: (C, C) => C) { private val sparkConf = SparkEnv.get.conf - private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean + private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { if (!externalSorting) { diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index b7c7773e58..a73714abca 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -106,8 +106,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - - val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean + val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c56e2ca2df..56cae6f6b9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -864,7 +864,7 @@ private[spark] object BlockManager extends Logging { val ID_GENERATOR = new IdGenerator def getMaxMemory(conf: SparkConf): Long = { - val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66) + val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) (Runtime.getRuntime.maxMemory * memoryFraction).toLong } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 50f05351eb..e3bcd895aa 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -71,21 +71,24 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( // Collective memory threshold shared across all running tasks private val maxMemoryThreshold = { - val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.75) + val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } - // How many inserts into this map before tracking its shuffle memory usage - private val initialInsertThreshold = - sparkConf.getLong("spark.shuffle.initialInsertThreshold", 1000) + // Number of pairs in the in-memory map + private var numPairsInMemory = 0 + + // Number of in-memory pairs inserted before tracking the map's shuffle memory usage + private val trackMemoryThreshold = 1000 + + // How many times we have spilled so far + private var spillCount = 0 private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 - private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean + private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false) private val comparator = new KCComparator[K, C] private val ser = serializer.newInstance() - private var insertCount = 0 - private var spillCount = 0 /** * Insert the given key and value into the map. @@ -94,14 +97,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( * enough room for this to happen. If so, allocate the memory required to grow the map; * otherwise, spill the in-memory map to disk. * - * The shuffle memory usage of the first initialInsertThreshold entries is not tracked. + * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. */ def insert(key: K, value: V) { - insertCount += 1 val update: (Boolean, C) => C = (hadVal, oldVal) => { if (hadVal) mergeValue(oldVal, value) else createCombiner(value) } - if (insertCount > initialInsertThreshold && currentMap.atGrowThreshold) { + if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) { val mapSize = currentMap.estimateSize() var shouldSpill = false val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap @@ -114,7 +116,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( val availableMemory = maxMemoryThreshold - (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) - // Assume map grow factor is 2x + // Assume map growth factor is 2x shouldSpill = availableMemory < mapSize * 2 if (!shouldSpill) { shuffleMemoryMap(threadId) = mapSize * 2 @@ -126,6 +128,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } } currentMap.changeValue(key, update) + numPairsInMemory += 1 } /** @@ -133,7 +136,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( */ private def spill(mapSize: Long) { spillCount += 1 - logWarning("* Spilling in-memory map of %d MB to disk (%d time%s so far)" + logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)" .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() val writer = @@ -157,9 +160,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( shuffleMemoryMap.synchronized { shuffleMemoryMap(Thread.currentThread().getId) = 0 } - insertCount = 0 + numPairsInMemory = 0 } + /** + * Return an iterator that merges the in-memory map with the spilled maps. + * If no spill has occurred, simply return the in-memory map's iterator. + */ override def iterator: Iterator[(K, C)] = { if (spilledMaps.isEmpty) { currentMap.iterator @@ -168,7 +175,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } } - /** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */ + /** + * An iterator that sort-merges (K, C) pairs from the in-memory map and the spilled maps + */ private class ExternalIterator extends Iterator[(K, C)] { // A fixed-size queue that maintains a buffer for each stream we are currently merging @@ -179,7 +188,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( val sortedMap = currentMap.destructiveSortedIterator(comparator) val inputStreams = Seq(sortedMap) ++ spilledMaps - inputStreams.foreach{ it => + inputStreams.foreach { it => val kcPairs = getMorePairs(it) mergeHeap.enqueue(StreamBuffer(it, kcPairs)) } @@ -187,6 +196,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( /** * Fetch from the given iterator until a key of different hash is retrieved. In the * event of key hash collisions, this ensures no pairs are hidden from being merged. + * Assume the given iterator is in sorted order. */ def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = { val kcPairs = new ArrayBuffer[(K, C)] @@ -219,17 +229,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( baseCombiner } - override def hasNext: Boolean = { - mergeHeap.foreach{ buffer => - if (!buffer.pairs.isEmpty) { - return true - } - } - false - } + /** + * Return true if there exists an input stream that still has unvisited pairs + */ + override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty) + /** + * Select a key with the minimum hash, then combine all values with the same key from all input streams. + */ override def next(): (K, C) = { - // Select a return key from the StreamBuffer that holds the lowest key hash + // Select a key from the StreamBuffer that holds the lowest key hash val minBuffer = mergeHeap.dequeue() val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) if (minPairs.length == 0) { @@ -285,45 +294,43 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } } - // Iterate through (K, C) pairs in sorted order from an on-disk map + /** + * An iterator that returns (K, C) pairs in sorted order from an on-disk map + */ private class DiskMapIterator(file: File) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream) val deserializeStream = ser.deserializeStream(bufferedStream) - var nextItem: Option[(K, C)] = None + var nextItem: (K, C) = null var eof = false - def readNextItem(): Option[(K, C)] = { + def readNextItem(): (K, C) = { if (!eof) { try { - return Some(deserializeStream.readObject().asInstanceOf[(K, C)]) + return deserializeStream.readObject().asInstanceOf[(K, C)] } catch { case e: EOFException => eof = true cleanup() } } - None + null } override def hasNext: Boolean = { - nextItem match { - case Some(item) => true - case None => - nextItem = readNextItem() - nextItem.isDefined + if (nextItem == null) { + nextItem = readNextItem() } + nextItem != null } override def next(): (K, C) = { - nextItem match { - case Some(item) => - nextItem = None - item - case None => - val item = readNextItem() - item.getOrElse(throw new NoSuchElementException) + val item = if (nextItem == null) readNextItem() else nextItem + if (item == null) { + throw new NoSuchElementException } + nextItem = null + item } // TODO: Ensure this gets called even if the iterator isn't drained. |