aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-10-13 13:49:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-13 13:49:59 -0700
commitb3ffac5178795f2d8e7908b3e77e8e89f50b5f6f (patch)
tree058d6885c0fffa8cfb496f5c4ed675f6a5345f75 /sql/core
parent2b574f52d7bf51b1fe2a73086a3735b633e9083f (diff)
downloadspark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.tar.gz
spark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.tar.bz2
spark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.zip
[SPARK-10983] Unified memory manager
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced: - **spark.memory.fraction (default 0.75)**: ​fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. - **spark.memory.storageFraction (default 0.5)**: size of the storage region within the space set aside by `s​park.memory.fraction`. ​Cached data may only be evicted if total storage exceeds this region. - **spark.memory.useLegacyMode (default false)**: whether to use the memory management that existed in Spark 1.5 and before. This is mainly for backward compatibility. For a detailed description of the design, see [SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch builds on top of the `MemoryManager` interface introduced in #9000. Author: Andrew Or <andrew@databricks.com> Closes #9084 from andrewor14/unified-memory-manager.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala2
2 files changed, 4 insertions, 8 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index ff65d7bdf8..835f52fa56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -57,7 +57,9 @@ class TestShuffleMemoryManager
}
private class GrantEverythingMemoryManager extends MemoryManager {
- override def acquireExecutionMemory(numBytes: Long): Long = numBytes
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
@@ -66,12 +68,6 @@ private class GrantEverythingMemoryManager extends MemoryManager {
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
- override def releaseExecutionMemory(numBytes: Long): Unit = { }
- override def releaseStorageMemory(numBytes: Long): Unit = { }
- override def releaseStorageMemory(): Unit = { }
- override def releaseUnrollMemory(numBytes: Long): Unit = { }
override def maxExecutionMemory: Long = Long.MaxValue
override def maxStorageMemory: Long = Long.MaxValue
- override def executionMemoryUsed: Long = Long.MaxValue
- override def storageMemoryUsed: Long = Long.MaxValue
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index f7d48bc53e..75d1fced59 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -103,7 +103,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
val conf = new SparkConf()
.set("spark.shuffle.spill.initialMemoryThreshold", "1024")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
- .set("spark.shuffle.memoryFraction", "0.0001")
+ .set("spark.testing.memory", "80000")
sc = new SparkContext("local", "test", conf)
outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")