From 4296d96c82881cde5832bd8f8a3b48eb9817a218 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 4 Jan 2014 00:00:57 -0800 Subject: Assign spill threshold as a fraction of maximum memory Further, divide this threshold by the number of tasks running concurrently. Note that this does not guard against the following scenario: a new task quickly fills up its share of the memory before old tasks finish spilling their contents, in which case the total memory used by such maps may exceed what was specified. Currently, spark.shuffle.safetyFraction mitigates the effect of this. --- .../src/main/scala/org/apache/spark/SparkEnv.scala | 16 +++++ .../scala/org/apache/spark/executor/Executor.scala | 2 + .../spark/util/collection/AppendOnlyMap.scala | 5 +- .../util/collection/ExternalAppendOnlyMap.scala | 74 +++++++++++++++------- .../collection/ExternalAppendOnlyMapSuite.scala | 17 ++--- 5 files changed, 81 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 634a94f0a7..224b5c1744 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -59,6 +59,9 @@ class SparkEnv private[spark] ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + // Number of tasks currently running across all threads + @volatile private var _numRunningTasks = 0 + // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() @@ -86,6 +89,19 @@ class SparkEnv private[spark] ( pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() } } + + /** + * Return the number of tasks currently running across all threads + */ + def numRunningTasks: Int = _numRunningTasks + + def incrementNumRunningTasks() = synchronized { + _numRunningTasks += 1 + } + + def decrementNumRunningTasks() = synchronized { + _numRunningTasks -= 1 + } } object SparkEnv extends Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e51d274d33..bd202affa2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -186,6 +186,7 @@ private[spark] class Executor( var taskStart: Long = 0 def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum val startGCTime = gcTime + env.incrementNumRunningTasks() try { SparkEnv.set(env) @@ -279,6 +280,7 @@ private[spark] class Executor( //System.exit(1) } } finally { + env.decrementNumRunningTasks() runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index d2a9574a71..d8fa7ed9af 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -17,8 +17,7 @@ package org.apache.spark.util.collection -import java.util -import java.util.Comparator +import java.util.{Arrays, Comparator} /** * A simple open hash table optimized for the append-only use case, where keys @@ -270,7 +269,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)]) } } - util.Arrays.sort(data, 0, newIndex, rawOrdering) + Arrays.sort(data, 0, newIndex, rawOrdering) new Iterator[(K, V)] { var i = 0 diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 68a23192c0..c348168a8b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -32,17 +32,28 @@ import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} * An append-only map that spills sorted content to disk when the memory threshold is exceeded. * * This map takes two passes over the data: - * (1) Values are merged into combiners, which are sorted and spilled to disk in as necessary. + * + * (1) Values are merged into combiners, which are sorted and spilled to disk as necessary * (2) Combiners are read from disk and merged together * - * Two parameters control the memory threshold: `spark.shuffle.buffer.mb` specifies the maximum - * size of the in-memory map before a spill, and `spark.shuffle.buffer.fraction` specifies an - * additional margin of safety. The second parameter is important for the following reason: + * The setting of the spill threshold faces the following trade-off: If the spill threshold is + * too high, the in-memory map may occupy more memory than is available, resulting in OOM. + * However, if the spill threshold is too low, we spill frequently and incur unnecessary disk + * writes. This may lead to a performance regression compared to the normal case of using the + * non-spilling AppendOnlyMap. + * + * A few parameters control the memory threshold: + * + * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing + * these maps as a fraction of the executor's total memory. Since each concurrently running + * task maintains one map, the actual threshold for each map is this quantity divided by the + * number of running tasks. * - * If the spill threshold is set too high, the in-memory map may occupy more memory than is - * available, resulting in OOM. However, if the spill threshold is set too low, we spill - * frequently and incur unnecessary disk writes. This may lead to a performance regression - * compared to the normal case of using the non-spilling AppendOnlyMap. + * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of + * this threshold, in case map size estimation is not sufficiently accurate. + * + * `spark.shuffle.updateThresholdInterval` controls how frequently each thread checks on + * shared executor state to update its local memory threshold. */ private[spark] class ExternalAppendOnlyMap[K, V, C]( @@ -56,35 +67,54 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( import ExternalAppendOnlyMap._ private var currentMap = new SizeTrackingAppendOnlyMap[K, C] - private val spilledMaps = new ArrayBuffer[DiskIterator] - + private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = new SparkConf() - private val memoryThresholdMB = { - // TODO: Turn this into a fraction of memory per reducer - val bufferSize = sparkConf.getLong("spark.shuffle.buffer.mb", 1024) - val bufferPercent = sparkConf.getDouble("spark.shuffle.buffer.fraction", 0.8) - bufferSize * bufferPercent + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { + val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.75) + val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) + (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } + + // Maximum size for this map before a spill is triggered + private var spillThreshold = maxMemoryThreshold + + // How often to update spillThreshold + private val updateThresholdInterval = + sparkConf.getInt("spark.shuffle.updateThresholdInterval", 100) + private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean private val comparator = new KCComparator[K, C] private val ser = serializer.newInstance() + private var insertCount = 0 private var spillCount = 0 - def insert(key: K, value: V): Unit = { + def insert(key: K, value: V) { + insertCount += 1 val update: (Boolean, C) => C = (hadVal, oldVal) => { if (hadVal) mergeValue(oldVal, value) else createCombiner(value) } currentMap.changeValue(key, update) - if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) { + if (insertCount % updateThresholdInterval == 1) { + updateSpillThreshold() + } + if (currentMap.estimateSize() > spillThreshold) { spill() } } - private def spill(): Unit = { + // TODO: differentiate ShuffleMapTask's from ResultTask's + private def updateSpillThreshold() { + val numRunningTasks = math.max(SparkEnv.get.numRunningTasks, 1) + spillThreshold = maxMemoryThreshold / numRunningTasks + } + + private def spill() { spillCount += 1 - logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!") - logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+" so far)") + logWarning("In-memory map exceeded %s MB! Spilling to disk (%d time%s so far)" + .format(spillThreshold / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() val writer = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites) @@ -100,7 +130,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( writer.close() } currentMap = new SizeTrackingAppendOnlyMap[K, C] - spilledMaps.append(new DiskIterator(file)) + spilledMaps.append(new DiskMapIterator(file)) } override def iterator: Iterator[(K, C)] = { @@ -228,7 +258,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } // Iterate through (K, C) pairs in sorted order from an on-disk map - private class DiskIterator(file: File) extends Iterator[(K, C)] { + private class DiskMapIterator(file: File) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream) val deserializeStream = ser.deserializeStream(bufferedStream) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 6c93b1f5a0..ef957bb0e5 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -5,15 +5,13 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark._ -import org.apache.spark.SparkContext.rddToPairRDDFunctions +import org.apache.spark.SparkContext._ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { override def beforeEach() { val conf = new SparkConf(false) conf.set("spark.shuffle.externalSorting", "true") - conf.set("spark.shuffle.buffer.mb", "1024") - conf.set("spark.shuffle.buffer.fraction", "0.8") sc = new SparkContext("local", "test", conf) } @@ -27,14 +25,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("simple insert") { - var map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) // Single insert map.insert(1, 10) var it = map.iterator assert(it.hasNext) - var kv = it.next() + val kv = it.next() assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10)) assert(!it.hasNext) @@ -59,7 +57,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local map.insert(1, 100) map.insert(2, 200) map.insert(1, 1000) - var it = map.iterator + val it = map.iterator assert(it.hasNext) val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet)) assert(result == Set[(Int, Set[Int])]( @@ -177,8 +175,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("spilling") { - System.setProperty("spark.shuffle.buffer.mb", "1") - System.setProperty("spark.shuffle.buffer.fraction", "0.05") + // TODO: Figure out correct memory parameters to actually induce spilling + // System.setProperty("spark.shuffle.buffer.mb", "1") + // System.setProperty("spark.shuffle.buffer.fraction", "0.05") // reduceByKey - should spill exactly 6 times val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i)) @@ -226,4 +225,6 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } } } + + // TODO: Test memory allocation for multiple concurrently running tasks } -- cgit v1.2.3