diff options
author | witgo <witgo@qq.com> | 2014-06-16 14:27:31 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-06-16 14:27:31 -0500 |
commit | cdf2b04570871848442ca9f9e2316a37e4aaaae0 (patch) | |
tree | ef908cd3a8d0d5ad7adea9e180255453e1674e03 /yarn/alpha/src | |
parent | 4fdb491775bb9c4afa40477dc0069ff6fcadfe25 (diff) | |
download | spark-cdf2b04570871848442ca9f9e2316a37e4aaaae0.tar.gz spark-cdf2b04570871848442ca9f9e2316a37e4aaaae0.tar.bz2 spark-cdf2b04570871848442ca9f9e2316a37e4aaaae0.zip |
[SPARK-1930] The Container is running beyond physical memory limits, so as to be killed
Author: witgo <witgo@qq.com>
Closes #894 from witgo/SPARK-1930 and squashes the following commits:
564307e [witgo] Update the running-on-yarn.md
3747515 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930
172647b [witgo] add memoryOverhead docs
a0ff545 [witgo] leaving only two configs
a17bda2 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930
478ca15 [witgo] Merge branch 'master' into SPARK-1930
d1244a1 [witgo] Merge branch 'master' into SPARK-1930
8b967ae [witgo] Merge branch 'master' into SPARK-1930
655a820 [witgo] review commit
71859a7 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930
e3c531d [witgo] review commit
e16f190 [witgo] different memoryOverhead
ffa7569 [witgo] review commit
5c9581f [witgo] Merge branch 'master' into SPARK-1930
9a6bcf2 [witgo] review commit
8fae45a [witgo] fix NullPointerException
e0dcc16 [witgo] Adding configuration items
b6a989c [witgo] Fix container memory beyond limit, were killed
Diffstat (limited to 'yarn/alpha/src')
3 files changed, 13 insertions, 7 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4ccddc214c..82f79d88a3 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] // Memory for the ApplicationMaster. - capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + capability.setMemory(args.amMemory + memoryOverhead) amContainer.setResource(capability) appContext.setQueue(args.amQueue) @@ -115,7 +115,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val minResMemory = newApp.getMinimumResourceCapability().getMemory() val amMemory = ((args.amMemory / minResMemory) * minResMemory) + ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - YarnAllocationHandler.MEMORY_OVERHEAD) + memoryOverhead) amMemory } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b6ecae1e65..bfdb6232f5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() + val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() // Compute number of threads for akka val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() if (minimumMemory > 0) { - val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 856391e52b..80e0162e9f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // Additional memory overhead - in mb. + private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor private val executorIdCounter = new AtomicInteger() @@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + memoryOverhead) } def allocateContainers(executorsToRequest: Int) { @@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler( val containerId = container.getId assert( container.getResource.getMemory >= - (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + (executorMemory + memoryOverhead)) if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler( if (numExecutors > 0) { logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, - executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + executorMemory + memoryOverhead)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) } @@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler( val rsrcRequest = Records.newRecord(classOf[ResourceRequest]) val memCapability = Records.newRecord(classOf[Resource]) // There probably is some overhead here, let's reserve a bit more memory. - memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memCapability.setMemory(executorMemory + memoryOverhead) rsrcRequest.setCapability(memCapability) val pri = Records.newRecord(classOf[Priority]) |