aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-04 00:00:57 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-04 00:00:57 -0800
commit4296d96c82881cde5832bd8f8a3b48eb9817a218 (patch)
treeb2445be45520fac4d69210a9f93f2a2999d6ee28 /core/src
parent333d58df8676b30adc86e479579e2659e24d01a3 (diff)
downloadspark-4296d96c82881cde5832bd8f8a3b48eb9817a218.tar.gz
spark-4296d96c82881cde5832bd8f8a3b48eb9817a218.tar.bz2
spark-4296d96c82881cde5832bd8f8a3b48eb9817a218.zip
Assign spill threshold as a fraction of maximum memory
Further, divide this threshold by the number of tasks running concurrently. Note that this does not guard against the following scenario: a new task quickly fills up its share of the memory before old tasks finish spilling their contents, in which case the total memory used by such maps may exceed what was specified. Currently, spark.shuffle.safetyFraction mitigates the effect of this.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala74
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala17
5 files changed, 81 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 634a94f0a7..224b5c1744 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -59,6 +59,9 @@ class SparkEnv private[spark] (
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
+ // Number of tasks currently running across all threads
+ @volatile private var _numRunningTasks = 0
+
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
@@ -86,6 +89,19 @@ class SparkEnv private[spark] (
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
}
}
+
+ /**
+ * Return the number of tasks currently running across all threads
+ */
+ def numRunningTasks: Int = _numRunningTasks
+
+ def incrementNumRunningTasks() = synchronized {
+ _numRunningTasks += 1
+ }
+
+ def decrementNumRunningTasks() = synchronized {
+ _numRunningTasks -= 1
+ }
}
object SparkEnv extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e51d274d33..bd202affa2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -186,6 +186,7 @@ private[spark] class Executor(
var taskStart: Long = 0
def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
val startGCTime = gcTime
+ env.incrementNumRunningTasks()
try {
SparkEnv.set(env)
@@ -279,6 +280,7 @@ private[spark] class Executor(
//System.exit(1)
}
} finally {
+ env.decrementNumRunningTasks()
runningTasks.remove(taskId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index d2a9574a71..d8fa7ed9af 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -17,8 +17,7 @@
package org.apache.spark.util.collection
-import java.util
-import java.util.Comparator
+import java.util.{Arrays, Comparator}
/**
* A simple open hash table optimized for the append-only use case, where keys
@@ -270,7 +269,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
}
}
- util.Arrays.sort(data, 0, newIndex, rawOrdering)
+ Arrays.sort(data, 0, newIndex, rawOrdering)
new Iterator[(K, V)] {
var i = 0
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 68a23192c0..c348168a8b 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -32,17 +32,28 @@ import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
* An append-only map that spills sorted content to disk when the memory threshold is exceeded.
*
* This map takes two passes over the data:
- * (1) Values are merged into combiners, which are sorted and spilled to disk in as necessary.
+ *
+ * (1) Values are merged into combiners, which are sorted and spilled to disk as necessary
* (2) Combiners are read from disk and merged together
*
- * Two parameters control the memory threshold: `spark.shuffle.buffer.mb` specifies the maximum
- * size of the in-memory map before a spill, and `spark.shuffle.buffer.fraction` specifies an
- * additional margin of safety. The second parameter is important for the following reason:
+ * The setting of the spill threshold faces the following trade-off: If the spill threshold is
+ * too high, the in-memory map may occupy more memory than is available, resulting in OOM.
+ * However, if the spill threshold is too low, we spill frequently and incur unnecessary disk
+ * writes. This may lead to a performance regression compared to the normal case of using the
+ * non-spilling AppendOnlyMap.
+ *
+ * A few parameters control the memory threshold:
+ *
+ * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing
+ * these maps as a fraction of the executor's total memory. Since each concurrently running
+ * task maintains one map, the actual threshold for each map is this quantity divided by the
+ * number of running tasks.
*
- * If the spill threshold is set too high, the in-memory map may occupy more memory than is
- * available, resulting in OOM. However, if the spill threshold is set too low, we spill
- * frequently and incur unnecessary disk writes. This may lead to a performance regression
- * compared to the normal case of using the non-spilling AppendOnlyMap.
+ * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
+ * this threshold, in case map size estimation is not sufficiently accurate.
+ *
+ * `spark.shuffle.updateThresholdInterval` controls how frequently each thread checks on
+ * shared executor state to update its local memory threshold.
*/
private[spark] class ExternalAppendOnlyMap[K, V, C](
@@ -56,35 +67,54 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
import ExternalAppendOnlyMap._
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
- private val spilledMaps = new ArrayBuffer[DiskIterator]
-
+ private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = new SparkConf()
- private val memoryThresholdMB = {
- // TODO: Turn this into a fraction of memory per reducer
- val bufferSize = sparkConf.getLong("spark.shuffle.buffer.mb", 1024)
- val bufferPercent = sparkConf.getDouble("spark.shuffle.buffer.fraction", 0.8)
- bufferSize * bufferPercent
+
+ // Collective memory threshold shared across all running tasks
+ private val maxMemoryThreshold = {
+ val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.75)
+ val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
+ (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
+
+ // Maximum size for this map before a spill is triggered
+ private var spillThreshold = maxMemoryThreshold
+
+ // How often to update spillThreshold
+ private val updateThresholdInterval =
+ sparkConf.getInt("spark.shuffle.updateThresholdInterval", 100)
+
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean
private val comparator = new KCComparator[K, C]
private val ser = serializer.newInstance()
+ private var insertCount = 0
private var spillCount = 0
- def insert(key: K, value: V): Unit = {
+ def insert(key: K, value: V) {
+ insertCount += 1
val update: (Boolean, C) => C = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
}
currentMap.changeValue(key, update)
- if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) {
+ if (insertCount % updateThresholdInterval == 1) {
+ updateSpillThreshold()
+ }
+ if (currentMap.estimateSize() > spillThreshold) {
spill()
}
}
- private def spill(): Unit = {
+ // TODO: differentiate ShuffleMapTask's from ResultTask's
+ private def updateSpillThreshold() {
+ val numRunningTasks = math.max(SparkEnv.get.numRunningTasks, 1)
+ spillThreshold = maxMemoryThreshold / numRunningTasks
+ }
+
+ private def spill() {
spillCount += 1
- logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!")
- logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+" so far)")
+ logWarning("In-memory map exceeded %s MB! Spilling to disk (%d time%s so far)"
+ .format(spillThreshold / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
val writer =
new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)
@@ -100,7 +130,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
writer.close()
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
- spilledMaps.append(new DiskIterator(file))
+ spilledMaps.append(new DiskMapIterator(file))
}
override def iterator: Iterator[(K, C)] = {
@@ -228,7 +258,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}
// Iterate through (K, C) pairs in sorted order from an on-disk map
- private class DiskIterator(file: File) extends Iterator[(K, C)] {
+ private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream)
val deserializeStream = ser.deserializeStream(bufferedStream)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 6c93b1f5a0..ef957bb0e5 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -5,15 +5,13 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark._
-import org.apache.spark.SparkContext.rddToPairRDDFunctions
+import org.apache.spark.SparkContext._
class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
override def beforeEach() {
val conf = new SparkConf(false)
conf.set("spark.shuffle.externalSorting", "true")
- conf.set("spark.shuffle.buffer.mb", "1024")
- conf.set("spark.shuffle.buffer.fraction", "0.8")
sc = new SparkContext("local", "test", conf)
}
@@ -27,14 +25,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("simple insert") {
- var map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
// Single insert
map.insert(1, 10)
var it = map.iterator
assert(it.hasNext)
- var kv = it.next()
+ val kv = it.next()
assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10))
assert(!it.hasNext)
@@ -59,7 +57,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
map.insert(1, 100)
map.insert(2, 200)
map.insert(1, 1000)
- var it = map.iterator
+ val it = map.iterator
assert(it.hasNext)
val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
assert(result == Set[(Int, Set[Int])](
@@ -177,8 +175,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("spilling") {
- System.setProperty("spark.shuffle.buffer.mb", "1")
- System.setProperty("spark.shuffle.buffer.fraction", "0.05")
+ // TODO: Figure out correct memory parameters to actually induce spilling
+ // System.setProperty("spark.shuffle.buffer.mb", "1")
+ // System.setProperty("spark.shuffle.buffer.fraction", "0.05")
// reduceByKey - should spill exactly 6 times
val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i))
@@ -226,4 +225,6 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
}
}
+
+ // TODO: Test memory allocation for multiple concurrently running tasks
}