aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala20
2 files changed, 30 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index a3321e3f17..6c57c98ea5 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -183,7 +183,17 @@ object UnifiedMemoryManager {
val minSystemMemory = reservedMemory * 1.5
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
- s"be at least $minSystemMemory. Please use a larger heap size.")
+ s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
+ s"option or spark.driver.memory in Spark configuration.")
+ }
+ // SPARK-12759 Check executor memory to fail fast if memory is insufficient
+ if (conf.contains("spark.executor.memory")) {
+ val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
+ if (executorMemory < minSystemMemory) {
+ throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
+ s"$minSystemMemory. Please increase executor memory using the " +
+ s"--executor-memory option or spark.executor.memory in Spark configuration.")
+ }
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 0c4359c3c2..9686c6621b 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -227,7 +227,25 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val exception = intercept[IllegalArgumentException] {
UnifiedMemoryManager(conf2, numCores = 1)
}
- assert(exception.getMessage.contains("larger heap size"))
+ assert(exception.getMessage.contains("increase heap size"))
+ }
+
+ test("insufficient executor memory") {
+ val systemMemory = 1024 * 1024
+ val reservedMemory = 300 * 1024
+ val memoryFraction = 0.8
+ val conf = new SparkConf()
+ .set("spark.memory.fraction", memoryFraction.toString)
+ .set("spark.testing.memory", systemMemory.toString)
+ .set("spark.testing.reservedMemory", reservedMemory.toString)
+ val mm = UnifiedMemoryManager(conf, numCores = 1)
+
+ // Try using an executor memory that's too small
+ val conf2 = conf.clone().set("spark.executor.memory", (reservedMemory / 2).toString)
+ val exception = intercept[IllegalArgumentException] {
+ UnifiedMemoryManager(conf2, numCores = 1)
+ }
+ assert(exception.getMessage.contains("increase executor memory"))
}
test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {