aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala86
5 files changed, 80 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 584261df04..08b592df71 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,8 +17,6 @@
package org.apache.spark
-import java.util.concurrent.atomic.AtomicInteger
-
import scala.collection.mutable
import scala.concurrent.Await
@@ -56,12 +54,13 @@ class SparkEnv private[spark] (
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
- val conf: SparkConf) {
+ val conf: SparkConf) extends Logging {
- private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
+ // A mapping of thread ID to amount of memory used for shuffle in bytes
+ // All accesses should be manually synchronized
+ val shuffleMemoryMap = mutable.HashMap[Long, Long]()
- // Number of tasks currently running across all threads
- private val _numRunningTasks = new AtomicInteger(0)
+ private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
@@ -90,13 +89,6 @@ class SparkEnv private[spark] (
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
}
}
-
- /**
- * Return the number of tasks currently running across all threads
- */
- def numRunningTasks: Int = _numRunningTasks.intValue()
- def incrementNumRunningTasks(): Int = _numRunningTasks.incrementAndGet()
- def decrementNumRunningTasks(): Int = _numRunningTasks.decrementAndGet()
}
object SparkEnv extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index bd202affa2..a7b2328a02 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -186,7 +186,6 @@ private[spark] class Executor(
var taskStart: Long = 0
def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
val startGCTime = gcTime
- env.incrementNumRunningTasks()
try {
SparkEnv.set(env)
@@ -280,7 +279,11 @@ private[spark] class Executor(
//System.exit(1)
}
} finally {
- env.decrementNumRunningTasks()
+ // TODO: Unregister shuffle memory only for ShuffleMapTask
+ val shuffleMemoryMap = env.shuffleMemoryMap
+ shuffleMemoryMap.synchronized {
+ shuffleMemoryMap.remove(Thread.currentThread().getId)
+ }
runningTasks.remove(taskId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 61e63c60d5..369a277232 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -181,4 +181,8 @@ class DiskBlockObjectWriter(
// Only valid if called after close()
override def timeWriting() = _timeWriting
+
+ def bytesWritten: Long = {
+ lastValidPosition - initialPosition
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 6faaa3197f..d98c7aa3d7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -30,14 +30,15 @@ import java.util.{Arrays, Comparator}
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
private[spark]
-class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable {
+class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
+ V)] with Serializable {
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
require(initialCapacity >= 1, "Invalid initial capacity")
private var capacity = nextPowerOf2(initialCapacity)
private var mask = capacity - 1
private var curSize = 0
- private var growThreshold = LOAD_FACTOR * capacity
+ private var growThreshold = (LOAD_FACTOR * capacity).toInt
// Holds keys and values in the same array for memory locality; specifically, the order of
// elements is key0, value0, key1, value1, key2, value2, etc.
@@ -239,7 +240,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
data = newData
capacity = newCapacity
mask = newMask
- growThreshold = LOAD_FACTOR * newCapacity
+ growThreshold = (LOAD_FACTOR * newCapacity).toInt
}
private def nextPowerOf2(n: Int): Int = {
@@ -288,4 +289,9 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
}
}
+
+ /**
+ * Return whether the next insert will cause the map to grow
+ */
+ def atGrowThreshold: Boolean = curSize == growThreshold
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index a5897e8066..50f05351eb 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -22,14 +22,16 @@ import java.util.Comparator
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
/**
- * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
+ * An append-only map that spills sorted content to disk when there is insufficient space for it
+ * to grow.
*
* This map takes two passes over the data:
*
@@ -42,7 +44,7 @@ import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
* writes. This may lead to a performance regression compared to the normal case of using the
* non-spilling AppendOnlyMap.
*
- * A few parameters control the memory threshold:
+ * Two parameters control the memory threshold:
*
* `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing
* these maps as a fraction of the executor's total memory. Since each concurrently running
@@ -51,9 +53,6 @@ import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
*
* `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
* this threshold, in case map size estimation is not sufficiently accurate.
- *
- * `spark.shuffle.updateThresholdInterval` controls how frequently each thread checks on
- * shared executor state to update its local memory threshold.
*/
private[spark] class ExternalAppendOnlyMap[K, V, C](
@@ -77,12 +76,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
- // Maximum size for this map before a spill is triggered
- private var spillThreshold = maxMemoryThreshold
-
- // How often to update spillThreshold
- private val updateThresholdInterval =
- sparkConf.getInt("spark.shuffle.updateThresholdInterval", 100)
+ // How many inserts into this map before tracking its shuffle memory usage
+ private val initialInsertThreshold =
+ sparkConf.getLong("spark.shuffle.initialInsertThreshold", 1000)
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean
@@ -91,30 +87,54 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var insertCount = 0
private var spillCount = 0
+ /**
+ * Insert the given key and value into the map.
+ *
+ * If the underlying map is about to grow, check if the global pool of shuffle memory has
+ * enough room for this to happen. If so, allocate the memory required to grow the map;
+ * otherwise, spill the in-memory map to disk.
+ *
+ * The shuffle memory usage of the first initialInsertThreshold entries is not tracked.
+ */
def insert(key: K, value: V) {
insertCount += 1
val update: (Boolean, C) => C = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
}
- currentMap.changeValue(key, update)
- if (insertCount % updateThresholdInterval == 1) {
- updateSpillThreshold()
- }
- if (currentMap.estimateSize() > spillThreshold) {
- spill()
+ if (insertCount > initialInsertThreshold && currentMap.atGrowThreshold) {
+ val mapSize = currentMap.estimateSize()
+ var shouldSpill = false
+ val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+
+ // Atomically check whether there is sufficient memory in the global pool for
+ // this map to grow and, if possible, allocate the required amount
+ shuffleMemoryMap.synchronized {
+ val threadId = Thread.currentThread().getId
+ val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
+ val availableMemory = maxMemoryThreshold -
+ (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
+
+ // Assume map grow factor is 2x
+ shouldSpill = availableMemory < mapSize * 2
+ if (!shouldSpill) {
+ shuffleMemoryMap(threadId) = mapSize * 2
+ }
+ }
+ // Do not synchronize spills
+ if (shouldSpill) {
+ spill(mapSize)
+ }
}
+ currentMap.changeValue(key, update)
}
- // TODO: differentiate ShuffleMapTask's from ResultTask's
- private def updateSpillThreshold() {
- val numRunningTasks = math.max(SparkEnv.get.numRunningTasks, 1)
- spillThreshold = maxMemoryThreshold / numRunningTasks
- }
-
- private def spill() {
+ /**
+ * Sort the existing contents of the in-memory map and spill them to a temporary file on disk
+ */
+ private def spill(mapSize: Long) {
spillCount += 1
- logWarning("In-memory map exceeded %s MB! Spilling to disk (%d time%s so far)"
- .format(spillThreshold / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
+ logWarning("* Spilling in-memory map of %d MB to disk (%d time%s so far)"
+ .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
val writer =
new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)
@@ -131,6 +151,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file))
+
+ // Reset the amount of shuffle memory used by this map in the global pool
+ val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+ shuffleMemoryMap.synchronized {
+ shuffleMemoryMap(Thread.currentThread().getId) = 0
+ }
+ insertCount = 0
}
override def iterator: Iterator[(K, C)] = {
@@ -145,11 +172,12 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private class ExternalIterator extends Iterator[(K, C)] {
// A fixed-size queue that maintains a buffer for each stream we are currently merging
- val mergeHeap = new PriorityQueue[StreamBuffer]
+ val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
- val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps
+ val sortedMap = currentMap.destructiveSortedIterator(comparator)
+ val inputStreams = Seq(sortedMap) ++ spilledMaps
inputStreams.foreach{ it =>
val kcPairs = getMorePairs(it)