aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Ableda <peter.ableda@cloudera.com>2016-04-25 10:42:49 +0200
committerSean Owen <sowen@cloudera.com>2016-04-25 10:42:49 +0200
commitcef77d1f68afab56c9de8690133241dc0563e55c (patch)
tree173445317e5aff045cd01fa76fb70ac231ccfba7
parente6f954a5799d0996bf9f22e0fb67a2f0568b57a4 (diff)
downloadspark-cef77d1f68afab56c9de8690133241dc0563e55c.tar.gz
spark-cef77d1f68afab56c9de8690133241dc0563e55c.tar.bz2
spark-cef77d1f68afab56c9de8690133241dc0563e55c.zip
[SPARK-14636] Add minimum memory checks for drivers and executors
## What changes were proposed in this pull request? Implement the same memory size validations for the StaticMemoryManager (Legacy) as the UnifiedMemoryManager has. ## How was this patch tested? Manual tests were done in CDH cluster. Test with small executor memory: ` spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn --executor-memory 15m --conf spark.memory.useLegacyMode=true /opt/cloudera/parcels/CDH/lib/spark/examples/lib/spark-examples*.jar 10 ` Exception thrown: ``` ERROR spark.SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: Executor memory 15728640 must be at least 471859200. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration. at org.apache.spark.memory.StaticMemoryManager$.org$apache$spark$memory$StaticMemoryManager$$getMaxExecutionMemory(StaticMemoryManager.scala:127) at org.apache.spark.memory.StaticMemoryManager.<init>(StaticMemoryManager.scala:46) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:352) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:289) at org.apache.spark.SparkContext.<init>(SparkContext.scala:462) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:29) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` Author: Peter Ableda <peter.ableda@cloudera.com> Closes #12395 from peterableda/SPARK-14636.
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala16
1 files changed, 16 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index cbd0fa9ec2..08155aa298 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -104,6 +104,8 @@ private[spark] class StaticMemoryManager(
private[spark] object StaticMemoryManager {
+ private val MIN_MEMORY_BYTES = 32 * 1024 * 1024
+
/**
* Return the total amount of memory available for the storage region, in bytes.
*/
@@ -119,6 +121,20 @@ private[spark] object StaticMemoryManager {
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+
+ if (systemMaxMemory < MIN_MEMORY_BYTES) {
+ throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
+ s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
+ s"option or spark.driver.memory in Spark configuration.")
+ }
+ if (conf.contains("spark.executor.memory")) {
+ val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
+ if (executorMemory < MIN_MEMORY_BYTES) {
+ throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
+ s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
+ s"--executor-memory option or spark.executor.memory in Spark configuration.")
+ }
+ }
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(systemMaxMemory * memoryFraction * safetyFraction).toLong