diff options
author | Peter Ableda <peter.ableda@cloudera.com> | 2016-04-25 10:42:49 +0200 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-04-25 10:42:49 +0200 |
commit | cef77d1f68afab56c9de8690133241dc0563e55c (patch) | |
tree | 173445317e5aff045cd01fa76fb70ac231ccfba7 | |
parent | e6f954a5799d0996bf9f22e0fb67a2f0568b57a4 (diff) | |
download | spark-cef77d1f68afab56c9de8690133241dc0563e55c.tar.gz spark-cef77d1f68afab56c9de8690133241dc0563e55c.tar.bz2 spark-cef77d1f68afab56c9de8690133241dc0563e55c.zip |
[SPARK-14636] Add minimum memory checks for drivers and executors
## What changes were proposed in this pull request?
Implement the same memory size validations for the StaticMemoryManager (Legacy) as the UnifiedMemoryManager has.
## How was this patch tested?
Manual tests were done in CDH cluster.
Test with small executor memory:
`
spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn --executor-memory 15m --conf spark.memory.useLegacyMode=true /opt/cloudera/parcels/CDH/lib/spark/examples/lib/spark-examples*.jar 10
`
Exception thrown:
```
ERROR spark.SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: Executor memory 15728640 must be at least 471859200. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.
at org.apache.spark.memory.StaticMemoryManager$.org$apache$spark$memory$StaticMemoryManager$$getMaxExecutionMemory(StaticMemoryManager.scala:127)
at org.apache.spark.memory.StaticMemoryManager.<init>(StaticMemoryManager.scala:46)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:352)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:289)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:462)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:29)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
Author: Peter Ableda <peter.ableda@cloudera.com>
Closes #12395 from peterableda/SPARK-14636.
-rw-r--r-- | core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index cbd0fa9ec2..08155aa298 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -104,6 +104,8 @@ private[spark] class StaticMemoryManager( private[spark] object StaticMemoryManager { + private val MIN_MEMORY_BYTES = 32 * 1024 * 1024 + /** * Return the total amount of memory available for the storage region, in bytes. */ @@ -119,6 +121,20 @@ private[spark] object StaticMemoryManager { */ private def getMaxExecutionMemory(conf: SparkConf): Long = { val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) + + if (systemMaxMemory < MIN_MEMORY_BYTES) { + throw new IllegalArgumentException(s"System memory $systemMaxMemory must " + + s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " + + s"option or spark.driver.memory in Spark configuration.") + } + if (conf.contains("spark.executor.memory")) { + val executorMemory = conf.getSizeAsBytes("spark.executor.memory") + if (executorMemory < MIN_MEMORY_BYTES) { + throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + + s"$MIN_MEMORY_BYTES. Please increase executor memory using the " + + s"--executor-memory option or spark.executor.memory in Spark configuration.") + } + } val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) (systemMaxMemory * memoryFraction * safetyFraction).toLong |