aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorwitgo <witgo@qq.com>2014-06-16 14:27:31 -0500
committerThomas Graves <tgraves@apache.org>2014-06-16 14:27:59 -0500
commitd7f94b9348c5289c3954023fc1593769851bcd36 (patch)
treec39ddbc82d9b8c7c65ce8aaf4c3bfb7b50e7d08d /yarn/stable
parent33db842b6fe1c0b60bff764a9e226452bf1914a8 (diff)
downloadspark-d7f94b9348c5289c3954023fc1593769851bcd36.tar.gz
spark-d7f94b9348c5289c3954023fc1593769851bcd36.tar.bz2
spark-d7f94b9348c5289c3954023fc1593769851bcd36.zip
[SPARK-1930] The Container is running beyond physical memory limits, so as to be killed
Author: witgo <witgo@qq.com> Closes #894 from witgo/SPARK-1930 and squashes the following commits: 564307e [witgo] Update the running-on-yarn.md 3747515 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 172647b [witgo] add memoryOverhead docs a0ff545 [witgo] leaving only two configs a17bda2 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 478ca15 [witgo] Merge branch 'master' into SPARK-1930 d1244a1 [witgo] Merge branch 'master' into SPARK-1930 8b967ae [witgo] Merge branch 'master' into SPARK-1930 655a820 [witgo] review commit 71859a7 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 e3c531d [witgo] review commit e16f190 [witgo] different memoryOverhead ffa7569 [witgo] review commit 5c9581f [witgo] Merge branch 'master' into SPARK-1930 9a6bcf2 [witgo] review commit 8fae45a [witgo] fix NullPointerException e0dcc16 [witgo] Adding configuration items b6a989c [witgo] Fix container memory beyond limit, were killed (cherry picked from commit cdf2b04570871848442ca9f9e2316a37e4aaaae0) Signed-off-by: Thomas Graves <tgraves@apache.org>
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala12
2 files changed, 10 insertions, 6 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1b6bfb42a5..ac46bd39ce 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
// Memory for the ApplicationMaster.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ memoryResource.setMemory(args.amMemory + memoryOverhead)
appContext.setResource(memoryResource)
// Finally, submit and monitor the application.
@@ -118,7 +118,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
// val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
// var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
// ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
- // YarnAllocationHandler.MEMORY_OVERHEAD)
+ // memoryOverhead )
args.amMemory
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a979fe4d62..29ccec2adc 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -90,6 +90,10 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
+ // Additional memory overhead - in mb.
+ private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
+
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
@@ -106,7 +110,7 @@ private[yarn] class YarnAllocationHandler(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + memoryOverhead)
}
def releaseContainer(container: Container) {
@@ -248,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
- val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ val executorMemoryOverhead = (executorMemory + memoryOverhead)
assert(container.getResource.getMemory >= executorMemoryOverhead)
if (numExecutorsRunningNow > maxExecutors) {
@@ -477,7 +481,7 @@ private[yarn] class YarnAllocationHandler(
numPendingAllocate.addAndGet(numExecutors)
logInfo("Will Allocate %d executor containers, each with %d memory".format(
numExecutors,
- (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+ (executorMemory + memoryOverhead)))
} else {
logDebug("Empty allocation request ...")
}
@@ -537,7 +541,7 @@ private[yarn] class YarnAllocationHandler(
priority: Int
): ArrayBuffer[ContainerRequest] = {
- val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val memoryRequest = executorMemory + memoryOverhead
val resource = Resource.newInstance(memoryRequest, executorCores)
val prioritySetting = Records.newRecord(classOf[Priority])