aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala89
-rw-r--r--docs/configuration.md24
5 files changed, 73 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 08a96b0c34..8b30cd4bfe 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -32,7 +32,7 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {
private val sparkConf = SparkEnv.get.conf
- private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
+ private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
if (!externalSorting) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index b7c7773e58..a73714abca 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -106,8 +106,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
-
- val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
+ val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c56e2ca2df..56cae6f6b9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -864,7 +864,7 @@ private[spark] object BlockManager extends Logging {
val ID_GENERATOR = new IdGenerator
def getMaxMemory(conf: SparkConf): Long = {
- val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66)
+ val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 50f05351eb..e3bcd895aa 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -71,21 +71,24 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// Collective memory threshold shared across all running tasks
private val maxMemoryThreshold = {
- val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.75)
+ val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3)
val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
- // How many inserts into this map before tracking its shuffle memory usage
- private val initialInsertThreshold =
- sparkConf.getLong("spark.shuffle.initialInsertThreshold", 1000)
+ // Number of pairs in the in-memory map
+ private var numPairsInMemory = 0
+
+ // Number of in-memory pairs inserted before tracking the map's shuffle memory usage
+ private val trackMemoryThreshold = 1000
+
+ // How many times we have spilled so far
+ private var spillCount = 0
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
- private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean
+ private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
private val comparator = new KCComparator[K, C]
private val ser = serializer.newInstance()
- private var insertCount = 0
- private var spillCount = 0
/**
* Insert the given key and value into the map.
@@ -94,14 +97,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
* enough room for this to happen. If so, allocate the memory required to grow the map;
* otherwise, spill the in-memory map to disk.
*
- * The shuffle memory usage of the first initialInsertThreshold entries is not tracked.
+ * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
*/
def insert(key: K, value: V) {
- insertCount += 1
val update: (Boolean, C) => C = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
}
- if (insertCount > initialInsertThreshold && currentMap.atGrowThreshold) {
+ if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
val mapSize = currentMap.estimateSize()
var shouldSpill = false
val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
@@ -114,7 +116,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
val availableMemory = maxMemoryThreshold -
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
- // Assume map grow factor is 2x
+ // Assume map growth factor is 2x
shouldSpill = availableMemory < mapSize * 2
if (!shouldSpill) {
shuffleMemoryMap(threadId) = mapSize * 2
@@ -126,6 +128,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}
}
currentMap.changeValue(key, update)
+ numPairsInMemory += 1
}
/**
@@ -133,7 +136,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
*/
private def spill(mapSize: Long) {
spillCount += 1
- logWarning("* Spilling in-memory map of %d MB to disk (%d time%s so far)"
+ logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
val writer =
@@ -157,9 +160,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap.synchronized {
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
- insertCount = 0
+ numPairsInMemory = 0
}
+ /**
+ * Return an iterator that merges the in-memory map with the spilled maps.
+ * If no spill has occurred, simply return the in-memory map's iterator.
+ */
override def iterator: Iterator[(K, C)] = {
if (spilledMaps.isEmpty) {
currentMap.iterator
@@ -168,7 +175,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}
}
- /** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */
+ /**
+ * An iterator that sort-merges (K, C) pairs from the in-memory map and the spilled maps
+ */
private class ExternalIterator extends Iterator[(K, C)] {
// A fixed-size queue that maintains a buffer for each stream we are currently merging
@@ -179,7 +188,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
val sortedMap = currentMap.destructiveSortedIterator(comparator)
val inputStreams = Seq(sortedMap) ++ spilledMaps
- inputStreams.foreach{ it =>
+ inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
mergeHeap.enqueue(StreamBuffer(it, kcPairs))
}
@@ -187,6 +196,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
/**
* Fetch from the given iterator until a key of different hash is retrieved. In the
* event of key hash collisions, this ensures no pairs are hidden from being merged.
+ * Assume the given iterator is in sorted order.
*/
def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
@@ -219,17 +229,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
baseCombiner
}
- override def hasNext: Boolean = {
- mergeHeap.foreach{ buffer =>
- if (!buffer.pairs.isEmpty) {
- return true
- }
- }
- false
- }
+ /**
+ * Return true if there exists an input stream that still has unvisited pairs
+ */
+ override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty)
+ /**
+ * Select a key with the minimum hash, then combine all values with the same key from all input streams.
+ */
override def next(): (K, C) = {
- // Select a return key from the StreamBuffer that holds the lowest key hash
+ // Select a key from the StreamBuffer that holds the lowest key hash
val minBuffer = mergeHeap.dequeue()
val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
if (minPairs.length == 0) {
@@ -285,45 +294,43 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}
}
- // Iterate through (K, C) pairs in sorted order from an on-disk map
+ /**
+ * An iterator that returns (K, C) pairs in sorted order from an on-disk map
+ */
private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream)
val deserializeStream = ser.deserializeStream(bufferedStream)
- var nextItem: Option[(K, C)] = None
+ var nextItem: (K, C) = null
var eof = false
- def readNextItem(): Option[(K, C)] = {
+ def readNextItem(): (K, C) = {
if (!eof) {
try {
- return Some(deserializeStream.readObject().asInstanceOf[(K, C)])
+ return deserializeStream.readObject().asInstanceOf[(K, C)]
} catch {
case e: EOFException =>
eof = true
cleanup()
}
}
- None
+ null
}
override def hasNext: Boolean = {
- nextItem match {
- case Some(item) => true
- case None =>
- nextItem = readNextItem()
- nextItem.isDefined
+ if (nextItem == null) {
+ nextItem = readNextItem()
}
+ nextItem != null
}
override def next(): (K, C) = {
- nextItem match {
- case Some(item) =>
- nextItem = None
- item
- case None =>
- val item = readNextItem()
- item.getOrElse(throw new NoSuchElementException)
+ val item = if (nextItem == null) readNextItem() else nextItem
+ if (item == null) {
+ throw new NoSuchElementException
}
+ nextItem = null
+ item
}
// TODO: Ensure this gets called even if the iterator isn't drained.
diff --git a/docs/configuration.md b/docs/configuration.md
index 6717757781..c1158491f0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -104,14 +104,25 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.storage.memoryFraction</td>
- <td>0.66</td>
+ <td>0.6</td>
<td>
Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
- generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase
+ generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase
it if you configure your own old generation size.
</td>
</tr>
<tr>
+ <td>spark.shuffle.memoryFraction</td>
+ <td>0.3</td>
+ <td>
+ Fraction of Java heap to use for aggregation and cogroups during shuffles, if
+ <code>spark.shuffle.externalSorting</code> is enabled. At any given time, the collective size of
+ all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
+ begin to spill to disk. If spills are often, consider increasing this value at the expense of
+ <code>spark.storage.memoryFraction</code>.
+ </td>
+</tr>
+<tr>
<td>spark.mesos.coarse</td>
<td>false</td>
<td>
@@ -377,6 +388,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.shuffle.externalSorting</td>
+ <td>true</td>
+ <td>
+ If set to "true", spills in-memory maps used for shuffles to disk when a memory threshold is reached. This
+ threshold is specified by <code>spark.shuffle.memoryFraction</code>. Enable this especially for memory-intensive
+ applications.
+ </td>
+</tr>
+<tr>
<td>spark.speculation</td>
<td>false</td>
<td>