aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala18
1 files changed, 14 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index d414ce39e9..a049746bd8 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark._
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.storage.{BlockObjectWriter, BlockId}
/**
@@ -134,8 +135,14 @@ private[spark] class ExternalSorter[K, V, C](
// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _
- // How much of the shared memory pool this collection has claimed
- private var myMemoryThreshold = 0L
+ // Initial threshold for the size of a collection before we start tracking its memory usage
+ private val initialMemoryThreshold =
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
+ ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
+
+ // Threshold for the collection's size in bytes before we start tracking its memory usage
+ // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
+ private var myMemoryThreshold = initialMemoryThreshold
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
// local aggregation and sorting, write numPartitions files directly and just concatenate them
@@ -285,8 +292,11 @@ private[spark] class ExternalSorter[K, V, C](
}
// Release our memory back to the shuffle pool so that other threads can grab it
- shuffleMemoryManager.release(myMemoryThreshold)
- myMemoryThreshold = 0
+ // The amount we requested does not include the initial memory tracking threshold
+ shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
+
+ // Reset this to the initial threshold to avoid spilling many small files
+ myMemoryThreshold = initialMemoryThreshold
_memoryBytesSpilled += memorySize
elementsRead = 0