aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-11-19 18:07:27 -0800
committerAndrew Or <andrew@databricks.com>2014-11-19 18:07:39 -0800
commit4a5c3d21b4df8fa506fe0365a0718c94bbc1cd1b (patch)
treec28205d6794b622e727265dc31c08317b3d2ab80 /core
parentf21e550e35e77363d2804fe22ad3f879a66498f1 (diff)
downloadspark-4a5c3d21b4df8fa506fe0365a0718c94bbc1cd1b.tar.gz
spark-4a5c3d21b4df8fa506fe0365a0718c94bbc1cd1b.tar.bz2
spark-4a5c3d21b4df8fa506fe0365a0718c94bbc1cd1b.zip
[SPARK-4480] Avoid many small spills in external data structures
**Summary.** Currently, we may spill many small files in `ExternalAppendOnlyMap` and `ExternalSorter`. The underlying root cause of this is summarized in [SPARK-4452](https://issues.apache.org/jira/browse/SPARK-4452). This PR does not address this root cause, but simply provides the guarantee that we never spill the in-memory data structure if its size is less than a configurable threshold of 5MB. This config is not documented because we don't want users to set it themselves, and it is not hard-coded because we need to change it in tests. **Symptom.** Each spill is orders of magnitude smaller than 1MB, and there are many spills. In environments where the ulimit is set, this frequently causes "too many open file" exceptions observed in [SPARK-3633](https://issues.apache.org/jira/browse/SPARK-3633). ``` 14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4792 B to disk (292769 spills so far) 14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4760 B to disk (292770 spills so far) 14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4520 B to disk (292771 spills so far) 14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4560 B to disk (292772 spills so far) 14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4792 B to disk (292773 spills so far) 14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4784 B to disk (292774 spills so far) ``` **Reproduction.** I ran the following on a small 4-node cluster with 512MB executors. Note that the back-to-back shuffle here is necessary for reasons described in [SPARK-4522](https://issues.apache.org/jira/browse/SPARK-4452). The second shuffle is a `reduceByKey` because it performs a map-side combine. ``` sc.parallelize(1 to 100000000, 100) .map { i => (i, i) } .groupByKey() .reduceByKey(_ ++ _) .count() ``` Before the change, I notice that each thread may spill up to 1000 times, and the size of each spill is on the order of 10KB. After the change, each thread spills only up to 20 times in the worst case, and the size of each spill is on the order of 1MB. Author: Andrew Or <andrew@databricks.com> Closes #3353 from andrewor14/avoid-small-spills and squashes the following commits: 49f380f [Andrew Or] Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/spark into avoid-small-spills 27d6966 [Andrew Or] Merge branch 'master' of github.com:apache/spark into avoid-small-spills f4736e3 [Andrew Or] Fix tests a919776 [Andrew Or] Avoid many small spills (cherry picked from commit 0eb4a7fb0fa1fa56677488cbd74eb39e65317621) Signed-off-by: Andrew Or <andrew@databricks.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/Spillable.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala2
2 files changed, 18 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index cb73b377fc..9f54312074 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -24,10 +24,7 @@ import org.apache.spark.SparkEnv
* Spills contents of an in-memory collection to disk when the memory threshold
* has been exceeded.
*/
-private[spark] trait Spillable[C] {
-
- this: Logging =>
-
+private[spark] trait Spillable[C] extends Logging {
/**
* Spills the current in-memory collection to disk, and releases the memory.
*
@@ -45,15 +42,21 @@ private[spark] trait Spillable[C] {
// Memory manager that can be used to acquire/release memory
private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
- // What threshold of elementsRead we start estimating collection size at
+ // Threshold for `elementsRead` before we start tracking this collection's memory usage
private[this] val trackMemoryThreshold = 1000
+ // Initial threshold for the size of a collection before we start tracking its memory usage
+ // Exposed for testing
+ private[this] val initialMemoryThreshold: Long =
+ SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)
+
+ // Threshold for this collection's size in bytes before we start tracking its memory usage
+ // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
+ private[this] var myMemoryThreshold = initialMemoryThreshold
+
// Number of elements read from input since last spill
private[this] var _elementsRead = 0L
- // How much of the shared memory pool this collection has claimed
- private[this] var myMemoryThreshold = 0L
-
// Number of bytes spilled in total
private[this] var _memoryBytesSpilled = 0L
@@ -102,8 +105,9 @@ private[spark] trait Spillable[C] {
* Release our memory back to the shuffle pool so that other threads can grab it.
*/
private def releaseMemoryForThisThread(): Unit = {
- shuffleMemoryManager.release(myMemoryThreshold)
- myMemoryThreshold = 0L
+ // The amount we requested does not include the initial memory tracking threshold
+ shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
+ myMemoryThreshold = initialMemoryThreshold
}
/**
@@ -114,7 +118,7 @@ private[spark] trait Spillable[C] {
@inline private def logSpillage(size: Long) {
val threadId = Thread.currentThread().getId
logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
- .format(threadId, org.apache.spark.util.Utils.bytesToString(size),
- _spillCount, if (_spillCount > 1) "s" else ""))
+ .format(threadId, org.apache.spark.util.Utils.bytesToString(size),
+ _spillCount, if (_spillCount > 1) "s" else ""))
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index f26e40fbd4..3cb42d416d 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
test("empty partitions with spilling") {
val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
test("empty partitions with spilling, bypass merge-sort") {
val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
+ conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)