aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala4
4 files changed, 37 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index ee91a368b7..c746e138b6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.shuffle
import scala.collection.mutable
-import org.apache.spark.{Logging, SparkException, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
/**
* Allocates a pool of memory to task threads for use in shuffle operations. Each disk-spilling
@@ -111,7 +111,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
}
}
-private object ShuffleMemoryManager {
+private[spark] object ShuffleMemoryManager {
/**
* Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
* of the memory pool and a safety factor since collections can sometimes grow bigger than
@@ -122,4 +122,7 @@ private object ShuffleMemoryManager {
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
+
+ // Initial threshold for the size of a collection before we start tracking its memory usage
+ val DEFAULT_INITIAL_MEMORY_THRESHOLD: Long = 5 * 1024 * 1024
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 96697d2e60..5619b30d0d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -28,10 +28,11 @@ import com.google.common.io.ByteStreams
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.{DeserializationStream, Serializer}
+import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.storage.{BlockId, BlockManager}
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
-import org.apache.spark.executor.ShuffleWriteMetrics
/**
* :: DeveloperApi ::
@@ -81,8 +82,14 @@ class ExternalAppendOnlyMap[K, V, C](
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000
- // How much of the shared memory pool this collection has claimed
- private var myMemoryThreshold = 0L
+ // Initial threshold for the size of a collection before we start tracking its memory usage
+ private val initialMemoryThreshold =
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
+ ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
+
+ // Threshold for the collection's size in bytes before we start tracking its memory usage
+ // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
+ private var myMemoryThreshold = initialMemoryThreshold
/**
* Size of object batches when reading/writing from serializers.
@@ -236,8 +243,11 @@ class ExternalAppendOnlyMap[K, V, C](
spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
// Release our memory back to the shuffle pool so that other threads can grab it
- shuffleMemoryManager.release(myMemoryThreshold)
- myMemoryThreshold = 0L
+ // The amount we requested does not include the initial memory tracking threshold
+ shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
+
+ // Reset this to the initial threshold to avoid spilling many small files
+ myMemoryThreshold = initialMemoryThreshold
elementsRead = 0
_memoryBytesSpilled += mapSize
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index d414ce39e9..a049746bd8 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark._
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.storage.{BlockObjectWriter, BlockId}
/**
@@ -134,8 +135,14 @@ private[spark] class ExternalSorter[K, V, C](
// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _
- // How much of the shared memory pool this collection has claimed
- private var myMemoryThreshold = 0L
+ // Initial threshold for the size of a collection before we start tracking its memory usage
+ private val initialMemoryThreshold =
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
+ ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
+
+ // Threshold for the collection's size in bytes before we start tracking its memory usage
+ // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
+ private var myMemoryThreshold = initialMemoryThreshold
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
// local aggregation and sorting, write numPartitions files directly and just concatenate them
@@ -285,8 +292,11 @@ private[spark] class ExternalSorter[K, V, C](
}
// Release our memory back to the shuffle pool so that other threads can grab it
- shuffleMemoryManager.release(myMemoryThreshold)
- myMemoryThreshold = 0
+ // The amount we requested does not include the initial memory tracking threshold
+ shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
+
+ // Reset this to the initial threshold to avoid spilling many small files
+ myMemoryThreshold = initialMemoryThreshold
_memoryBytesSpilled += memorySize
elementsRead = 0
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index f26e40fbd4..f4db3ff431 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
test("empty partitions with spilling") {
val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
test("empty partitions with spilling, bypass merge-sort") {
val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -761,5 +763,5 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
}
sorter2.stop()
- }
+ }
}