diff options
4 files changed, 37 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index ee91a368b7..c746e138b6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.shuffle import scala.collection.mutable -import org.apache.spark.{Logging, SparkException, SparkConf} +import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} /** * Allocates a pool of memory to task threads for use in shuffle operations. Each disk-spilling @@ -111,7 +111,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { } } -private object ShuffleMemoryManager { +private[spark] object ShuffleMemoryManager { /** * Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction * of the memory pool and a safety factor since collections can sometimes grow bigger than @@ -122,4 +122,7 @@ private object ShuffleMemoryManager { val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } + + // Initial threshold for the size of a collection before we start tracking its memory usage + val DEFAULT_INITIAL_MEMORY_THRESHOLD: Long = 5 * 1024 * 1024 } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 96697d2e60..5619b30d0d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -28,10 +28,11 @@ import com.google.common.io.ByteStreams import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.{DeserializationStream, Serializer} +import org.apache.spark.shuffle.ShuffleMemoryManager import org.apache.spark.storage.{BlockId, BlockManager} import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator -import org.apache.spark.executor.ShuffleWriteMetrics /** * :: DeveloperApi :: @@ -81,8 +82,14 @@ class ExternalAppendOnlyMap[K, V, C]( // Number of in-memory pairs inserted before tracking the map's shuffle memory usage private val trackMemoryThreshold = 1000 - // How much of the shared memory pool this collection has claimed - private var myMemoryThreshold = 0L + // Initial threshold for the size of a collection before we start tracking its memory usage + private val initialMemoryThreshold = + SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", + ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD) + + // Threshold for the collection's size in bytes before we start tracking its memory usage + // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0 + private var myMemoryThreshold = initialMemoryThreshold /** * Size of object batches when reading/writing from serializers. @@ -236,8 +243,11 @@ class ExternalAppendOnlyMap[K, V, C]( spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) // Release our memory back to the shuffle pool so that other threads can grab it - shuffleMemoryManager.release(myMemoryThreshold) - myMemoryThreshold = 0L + // The amount we requested does not include the initial memory tracking threshold + shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold) + + // Reset this to the initial threshold to avoid spilling many small files + myMemoryThreshold = initialMemoryThreshold elementsRead = 0 _memoryBytesSpilled += mapSize diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index d414ce39e9..a049746bd8 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams import org.apache.spark._ import org.apache.spark.serializer.{DeserializationStream, Serializer} import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.shuffle.ShuffleMemoryManager import org.apache.spark.storage.{BlockObjectWriter, BlockId} /** @@ -134,8 +135,14 @@ private[spark] class ExternalSorter[K, V, C]( // Write metrics for current spill private var curWriteMetrics: ShuffleWriteMetrics = _ - // How much of the shared memory pool this collection has claimed - private var myMemoryThreshold = 0L + // Initial threshold for the size of a collection before we start tracking its memory usage + private val initialMemoryThreshold = + SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", + ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD) + + // Threshold for the collection's size in bytes before we start tracking its memory usage + // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0 + private var myMemoryThreshold = initialMemoryThreshold // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need // local aggregation and sorting, write numPartitions files directly and just concatenate them @@ -285,8 +292,11 @@ private[spark] class ExternalSorter[K, V, C]( } // Release our memory back to the shuffle pool so that other threads can grab it - shuffleMemoryManager.release(myMemoryThreshold) - myMemoryThreshold = 0 + // The amount we requested does not include the initial memory tracking threshold + shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold) + + // Reset this to the initial threshold to avoid spilling many small files + myMemoryThreshold = initialMemoryThreshold _memoryBytesSpilled += memorySize elementsRead = 0 diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index f26e40fbd4..f4db3ff431 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe test("empty partitions with spilling") { val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") + conf.set("spark.shuffle.spill.initialMemoryThreshold", "512") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe test("empty partitions with spilling, bypass merge-sort") { val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") + conf.set("spark.shuffle.spill.initialMemoryThreshold", "512") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -761,5 +763,5 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe } sorter2.stop() - } + } } |