From d7f94b9348c5289c3954023fc1593769851bcd36 Mon Sep 17 00:00:00 2001 From: witgo Date: Mon, 16 Jun 2014 14:27:31 -0500 Subject: [SPARK-1930] The Container is running beyond physical memory limits, so as to be killed Author: witgo Closes #894 from witgo/SPARK-1930 and squashes the following commits: 564307e [witgo] Update the running-on-yarn.md 3747515 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 172647b [witgo] add memoryOverhead docs a0ff545 [witgo] leaving only two configs a17bda2 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 478ca15 [witgo] Merge branch 'master' into SPARK-1930 d1244a1 [witgo] Merge branch 'master' into SPARK-1930 8b967ae [witgo] Merge branch 'master' into SPARK-1930 655a820 [witgo] review commit 71859a7 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 e3c531d [witgo] review commit e16f190 [witgo] different memoryOverhead ffa7569 [witgo] review commit 5c9581f [witgo] Merge branch 'master' into SPARK-1930 9a6bcf2 [witgo] review commit 8fae45a [witgo] fix NullPointerException e0dcc16 [witgo] Adding configuration items b6a989c [witgo] Fix container memory beyond limit, were killed (cherry picked from commit cdf2b04570871848442ca9f9e2316a37e4aaaae0) Signed-off-by: Thomas Graves --- docs/running-on-yarn.md | 14 ++++++++++++++ .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- .../org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 4 +++- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 12 ++++++++---- .../scala/org/apache/spark/deploy/yarn/ClientBase.scala | 14 +++++++++----- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 12 ++++++++---- 7 files changed, 46 insertions(+), 18 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index af1788f2aa..4243ef480b 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for other deployment modes The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. + + spark.yarn.executor.memoryOverhead + 384 + + The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. + + + + spark.yarn.driver.memoryOverhead + 384 + + The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. + + By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`. diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8226207de4..e98287ca23 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] // Memory for the ApplicationMaster. - capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + capability.setMemory(args.amMemory + memoryOverhead) amContainer.setResource(capability) appContext.setQueue(args.amQueue) @@ -116,7 +116,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val minResMemory = newApp.getMinimumResourceCapability().getMemory() val amMemory = ((args.amMemory / minResMemory) * minResMemory) + ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - YarnAllocationHandler.MEMORY_OVERHEAD) + memoryOverhead) amMemory } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b6ecae1e65..bfdb6232f5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() + val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() // Compute number of threads for akka val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() if (minimumMemory > 0) { - val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 856391e52b..80e0162e9f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // Additional memory overhead - in mb. + private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor private val executorIdCounter = new AtomicInteger() @@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + memoryOverhead) } def allocateContainers(executorsToRequest: Int) { @@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler( val containerId = container.getId assert( container.getResource.getMemory >= - (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + (executorMemory + memoryOverhead)) if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler( if (numExecutors > 0) { logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, - executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + executorMemory + memoryOverhead)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) } @@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler( val rsrcRequest = Records.newRecord(classOf[ResourceRequest]) val memCapability = Records.newRecord(classOf[Resource]) // There probably is some overhead here, let's reserve a bit more memory. - memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memCapability.setMemory(executorMemory + memoryOverhead) rsrcRequest.setCapability(memCapability) val pri = Records.newRecord(classOf[Priority]) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index aeb3f0062d..1f543f7420 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -65,6 +65,10 @@ trait ClientBase extends Logging { val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) + // Additional memory overhead - in mb. + protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { Map( @@ -72,10 +76,10 @@ trait ClientBase extends Logging { "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", - (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" + - "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD), - (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" + - "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString) + (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" + + "greater than: " + memoryOverhead), + (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" + + "must be greater than: " + memoryOverhead.toString) ).foreach { case(cond, errStr) => if (cond) { logError(errStr) @@ -98,7 +102,7 @@ trait ClientBase extends Logging { format(args.executorMemory, maxMem)) System.exit(1) } - val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val amMem = args.amMemory + memoryOverhead if (amMem > maxMem) { logError("Required AM memory (%d) is above the max threshold (%d) of this cluster". format(args.amMemory, maxMem)) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1b6bfb42a5..ac46bd39ce 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // Memory for the ApplicationMaster. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] - memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memoryResource.setMemory(args.amMemory + memoryOverhead) appContext.setResource(memoryResource) // Finally, submit and monitor the application. @@ -118,7 +118,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() // var amMemory = ((args.amMemory / minResMemory) * minResMemory) + // ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - // YarnAllocationHandler.MEMORY_OVERHEAD) + // memoryOverhead ) args.amMemory } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index a979fe4d62..29ccec2adc 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -90,6 +90,10 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // Additional memory overhead - in mb. + private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. private val numPendingAllocate = new AtomicInteger() @@ -106,7 +110,7 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + memoryOverhead) } def releaseContainer(container: Container) { @@ -248,7 +252,7 @@ private[yarn] class YarnAllocationHandler( val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + val executorMemoryOverhead = (executorMemory + memoryOverhead) assert(container.getResource.getMemory >= executorMemoryOverhead) if (numExecutorsRunningNow > maxExecutors) { @@ -477,7 +481,7 @@ private[yarn] class YarnAllocationHandler( numPendingAllocate.addAndGet(numExecutors) logInfo("Will Allocate %d executor containers, each with %d memory".format( numExecutors, - (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) + (executorMemory + memoryOverhead))) } else { logDebug("Empty allocation request ...") } @@ -537,7 +541,7 @@ private[yarn] class YarnAllocationHandler( priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val memoryRequest = executorMemory + memoryOverhead val resource = Resource.newInstance(memoryRequest, executorCores) val prioritySetting = Records.newRecord(classOf[Priority]) -- cgit v1.2.3