aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-12-01 19:51:12 -0800
committerAndrew Or <andrew@databricks.com>2015-12-01 19:51:12 -0800
commitd96f8c997b9bb5c3d61f513d2c71d67ccf8e85d6 (patch)
tree18ce4722a49b29f9f22bf5d30d77d345c13f041c /core
parent1ce4adf55b535518c2e63917a827fac1f2df4e8e (diff)
downloadspark-d96f8c997b9bb5c3d61f513d2c71d67ccf8e85d6.tar.gz
spark-d96f8c997b9bb5c3d61f513d2c71d67ccf8e85d6.tar.bz2
spark-d96f8c997b9bb5c3d61f513d2c71d67ccf8e85d6.zip
[SPARK-12081] Make unified memory manager work with small heaps
The existing `spark.memory.fraction` (default 0.75) gives the system 25% of the space to work with. For small heaps, this is not enough: e.g. default 1GB leaves only 250MB system memory. This is especially a problem in local mode, where the driver and executor are crammed in the same JVM. Members of the community have reported driver OOM's in such cases. **New proposal.** We now reserve 300MB before taking the 75%. For 1GB JVMs, this leaves `(1024 - 300) * 0.75 = 543MB` for execution and storage. This is proposal (1) listed in the [JIRA](https://issues.apache.org/jira/browse/SPARK-12081). Author: Andrew Or <andrew@databricks.com> Closes #10081 from andrewor14/unified-memory-small-heaps.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala20
2 files changed, 38 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 8be5b05419..48b4e23433 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
* either side can borrow memory from the other.
*
- * The region shared between execution and storage is a fraction of the total heap space
+ * The region shared between execution and storage is a fraction of (the total heap space - 300MB)
* configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
* within this space is further determined by `spark.memory.storageFraction` (default 0.5).
* This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
@@ -48,7 +48,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
*/
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
- maxMemory: Long,
+ val maxMemory: Long,
private val storageRegionSize: Long,
numCores: Int)
extends MemoryManager(
@@ -130,6 +130,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
object UnifiedMemoryManager {
+ // Set aside a fixed amount of memory for non-storage, non-execution purposes.
+ // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
+ // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
+ // the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default.
+ private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
+
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
@@ -144,8 +150,16 @@ object UnifiedMemoryManager {
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
- val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+ val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+ val reservedMemory = conf.getLong("spark.testing.reservedMemory",
+ if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
+ val minSystemMemory = reservedMemory * 1.5
+ if (systemMemory < minSystemMemory) {
+ throw new IllegalArgumentException(s"System memory $systemMemory must " +
+ s"be at least $minSystemMemory. Please use a larger heap size.")
+ }
+ val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
- (systemMaxMemory * memoryFraction).toLong
+ (usableMemory * memoryFraction).toLong
}
}
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 8cebe81c3b..e97c898a44 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -182,4 +182,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assertEnsureFreeSpaceCalled(ms, 850L)
}
+ test("small heap") {
+ val systemMemory = 1024 * 1024
+ val reservedMemory = 300 * 1024
+ val memoryFraction = 0.8
+ val conf = new SparkConf()
+ .set("spark.memory.fraction", memoryFraction.toString)
+ .set("spark.testing.memory", systemMemory.toString)
+ .set("spark.testing.reservedMemory", reservedMemory.toString)
+ val mm = UnifiedMemoryManager(conf, numCores = 1)
+ val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong
+ assert(mm.maxMemory === expectedMaxMemory)
+
+ // Try using a system memory that's too small
+ val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString)
+ val exception = intercept[IllegalArgumentException] {
+ UnifiedMemoryManager(conf2, numCores = 1)
+ }
+ assert(exception.getMessage.contains("larger heap size"))
+ }
+
}