diff options
5 files changed, 80 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 584261df04..08b592df71 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,8 +17,6 @@ package org.apache.spark -import java.util.concurrent.atomic.AtomicInteger - import scala.collection.mutable import scala.concurrent.Await @@ -56,12 +54,13 @@ class SparkEnv private[spark] ( val httpFileServer: HttpFileServer, val sparkFilesDir: String, val metricsSystem: MetricsSystem, - val conf: SparkConf) { + val conf: SparkConf) extends Logging { - private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + // A mapping of thread ID to amount of memory used for shuffle in bytes + // All accesses should be manually synchronized + val shuffleMemoryMap = mutable.HashMap[Long, Long]() - // Number of tasks currently running across all threads - private val _numRunningTasks = new AtomicInteger(0) + private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). @@ -90,13 +89,6 @@ class SparkEnv private[spark] ( pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() } } - - /** - * Return the number of tasks currently running across all threads - */ - def numRunningTasks: Int = _numRunningTasks.intValue() - def incrementNumRunningTasks(): Int = _numRunningTasks.incrementAndGet() - def decrementNumRunningTasks(): Int = _numRunningTasks.decrementAndGet() } object SparkEnv extends Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index bd202affa2..a7b2328a02 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -186,7 +186,6 @@ private[spark] class Executor( var taskStart: Long = 0 def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum val startGCTime = gcTime - env.incrementNumRunningTasks() try { SparkEnv.set(env) @@ -280,7 +279,11 @@ private[spark] class Executor( //System.exit(1) } } finally { - env.decrementNumRunningTasks() + // TODO: Unregister shuffle memory only for ShuffleMapTask + val shuffleMemoryMap = env.shuffleMemoryMap + shuffleMemoryMap.synchronized { + shuffleMemoryMap.remove(Thread.currentThread().getId) + } runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 61e63c60d5..369a277232 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -181,4 +181,8 @@ class DiskBlockObjectWriter( // Only valid if called after close() override def timeWriting() = _timeWriting + + def bytesWritten: Long = { + lastValidPosition - initialPosition + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 6faaa3197f..d98c7aa3d7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -30,14 +30,15 @@ import java.util.{Arrays, Comparator} * TODO: Cache the hash values of each key? java.util.HashMap does that. */ private[spark] -class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable { +class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, + V)] with Serializable { require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") require(initialCapacity >= 1, "Invalid initial capacity") private var capacity = nextPowerOf2(initialCapacity) private var mask = capacity - 1 private var curSize = 0 - private var growThreshold = LOAD_FACTOR * capacity + private var growThreshold = (LOAD_FACTOR * capacity).toInt // Holds keys and values in the same array for memory locality; specifically, the order of // elements is key0, value0, key1, value1, key2, value2, etc. @@ -239,7 +240,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi data = newData capacity = newCapacity mask = newMask - growThreshold = LOAD_FACTOR * newCapacity + growThreshold = (LOAD_FACTOR * newCapacity).toInt } private def nextPowerOf2(n: Int): Int = { @@ -288,4 +289,9 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } } } + + /** + * Return whether the next insert will cause the map to grow + */ + def atGrowThreshold: Boolean = curSize == growThreshold } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index a5897e8066..50f05351eb 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -22,14 +22,16 @@ import java.util.Comparator import it.unimi.dsi.fastutil.io.FastBufferedInputStream -import scala.collection.mutable.{ArrayBuffer, PriorityQueue} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} /** - * An append-only map that spills sorted content to disk when the memory threshold is exceeded. + * An append-only map that spills sorted content to disk when there is insufficient space for it + * to grow. * * This map takes two passes over the data: * @@ -42,7 +44,7 @@ import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} * writes. This may lead to a performance regression compared to the normal case of using the * non-spilling AppendOnlyMap. * - * A few parameters control the memory threshold: + * Two parameters control the memory threshold: * * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing * these maps as a fraction of the executor's total memory. Since each concurrently running @@ -51,9 +53,6 @@ import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} * * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of * this threshold, in case map size estimation is not sufficiently accurate. - * - * `spark.shuffle.updateThresholdInterval` controls how frequently each thread checks on - * shared executor state to update its local memory threshold. */ private[spark] class ExternalAppendOnlyMap[K, V, C]( @@ -77,12 +76,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } - // Maximum size for this map before a spill is triggered - private var spillThreshold = maxMemoryThreshold - - // How often to update spillThreshold - private val updateThresholdInterval = - sparkConf.getInt("spark.shuffle.updateThresholdInterval", 100) + // How many inserts into this map before tracking its shuffle memory usage + private val initialInsertThreshold = + sparkConf.getLong("spark.shuffle.initialInsertThreshold", 1000) private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean @@ -91,30 +87,54 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private var insertCount = 0 private var spillCount = 0 + /** + * Insert the given key and value into the map. + * + * If the underlying map is about to grow, check if the global pool of shuffle memory has + * enough room for this to happen. If so, allocate the memory required to grow the map; + * otherwise, spill the in-memory map to disk. + * + * The shuffle memory usage of the first initialInsertThreshold entries is not tracked. + */ def insert(key: K, value: V) { insertCount += 1 val update: (Boolean, C) => C = (hadVal, oldVal) => { if (hadVal) mergeValue(oldVal, value) else createCombiner(value) } - currentMap.changeValue(key, update) - if (insertCount % updateThresholdInterval == 1) { - updateSpillThreshold() - } - if (currentMap.estimateSize() > spillThreshold) { - spill() + if (insertCount > initialInsertThreshold && currentMap.atGrowThreshold) { + val mapSize = currentMap.estimateSize() + var shouldSpill = false + val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap + + // Atomically check whether there is sufficient memory in the global pool for + // this map to grow and, if possible, allocate the required amount + shuffleMemoryMap.synchronized { + val threadId = Thread.currentThread().getId + val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) + val availableMemory = maxMemoryThreshold - + (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) + + // Assume map grow factor is 2x + shouldSpill = availableMemory < mapSize * 2 + if (!shouldSpill) { + shuffleMemoryMap(threadId) = mapSize * 2 + } + } + // Do not synchronize spills + if (shouldSpill) { + spill(mapSize) + } } + currentMap.changeValue(key, update) } - // TODO: differentiate ShuffleMapTask's from ResultTask's - private def updateSpillThreshold() { - val numRunningTasks = math.max(SparkEnv.get.numRunningTasks, 1) - spillThreshold = maxMemoryThreshold / numRunningTasks - } - - private def spill() { + /** + * Sort the existing contents of the in-memory map and spill them to a temporary file on disk + */ + private def spill(mapSize: Long) { spillCount += 1 - logWarning("In-memory map exceeded %s MB! Spilling to disk (%d time%s so far)" - .format(spillThreshold / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) + logWarning("* Spilling in-memory map of %d MB to disk (%d time%s so far)" + .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() val writer = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites) @@ -131,6 +151,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } currentMap = new SizeTrackingAppendOnlyMap[K, C] spilledMaps.append(new DiskMapIterator(file)) + + // Reset the amount of shuffle memory used by this map in the global pool + val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap + shuffleMemoryMap.synchronized { + shuffleMemoryMap(Thread.currentThread().getId) = 0 + } + insertCount = 0 } override def iterator: Iterator[(K, C)] = { @@ -145,11 +172,12 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private class ExternalIterator extends Iterator[(K, C)] { // A fixed-size queue that maintains a buffer for each stream we are currently merging - val mergeHeap = new PriorityQueue[StreamBuffer] + val mergeHeap = new mutable.PriorityQueue[StreamBuffer] // Input streams are derived both from the in-memory map and spilled maps on disk // The in-memory map is sorted in place, while the spilled maps are already in sorted order - val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps + val sortedMap = currentMap.destructiveSortedIterator(comparator) + val inputStreams = Seq(sortedMap) ++ spilledMaps inputStreams.foreach{ it => val kcPairs = getMorePairs(it) |