aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorwitgo <witgo@qq.com>2014-06-16 14:27:31 -0500
committerThomas Graves <tgraves@apache.org>2014-06-16 14:27:31 -0500
commitcdf2b04570871848442ca9f9e2316a37e4aaaae0 (patch)
treeef908cd3a8d0d5ad7adea9e180255453e1674e03 /yarn/alpha
parent4fdb491775bb9c4afa40477dc0069ff6fcadfe25 (diff)
downloadspark-cdf2b04570871848442ca9f9e2316a37e4aaaae0.tar.gz
spark-cdf2b04570871848442ca9f9e2316a37e4aaaae0.tar.bz2
spark-cdf2b04570871848442ca9f9e2316a37e4aaaae0.zip
[SPARK-1930] The Container is running beyond physical memory limits, so as to be killed
Author: witgo <witgo@qq.com> Closes #894 from witgo/SPARK-1930 and squashes the following commits: 564307e [witgo] Update the running-on-yarn.md 3747515 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 172647b [witgo] add memoryOverhead docs a0ff545 [witgo] leaving only two configs a17bda2 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 478ca15 [witgo] Merge branch 'master' into SPARK-1930 d1244a1 [witgo] Merge branch 'master' into SPARK-1930 8b967ae [witgo] Merge branch 'master' into SPARK-1930 655a820 [witgo] review commit 71859a7 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 e3c531d [witgo] review commit e16f190 [witgo] different memoryOverhead ffa7569 [witgo] review commit 5c9581f [witgo] Merge branch 'master' into SPARK-1930 9a6bcf2 [witgo] review commit 8fae45a [witgo] fix NullPointerException e0dcc16 [witgo] Adding configuration items b6a989c [witgo] Fix container memory beyond limit, were killed
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala4
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala12
3 files changed, 13 insertions, 7 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4ccddc214c..82f79d88a3 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
// Memory for the ApplicationMaster.
- capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ capability.setMemory(args.amMemory + memoryOverhead)
amContainer.setResource(capability)
appContext.setQueue(args.amQueue)
@@ -115,7 +115,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val minResMemory = newApp.getMinimumResourceCapability().getMemory()
val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
- YarnAllocationHandler.MEMORY_OVERHEAD)
+ memoryOverhead)
amMemory
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b6ecae1e65..bfdb6232f5 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
+
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Compute number of threads for akka
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
- val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 856391e52b..80e0162e9f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
+ // Additional memory overhead - in mb.
+ private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
+
private val numExecutorsRunning = new AtomicInteger()
// Used to generate a unique id per executor
private val executorIdCounter = new AtomicInteger()
@@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + memoryOverhead)
}
def allocateContainers(executorsToRequest: Int) {
@@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler(
val containerId = container.getId
assert( container.getResource.getMemory >=
- (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ (executorMemory + memoryOverhead))
if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
@@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler(
if (numExecutors > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
- executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ executorMemory + memoryOverhead))
} else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}
@@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler(
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
val memCapability = Records.newRecord(classOf[Resource])
// There probably is some overhead here, let's reserve a bit more memory.
- memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ memCapability.setMemory(executorMemory + memoryOverhead)
rsrcRequest.setCapability(memCapability)
val pri = Records.newRecord(classOf[Priority])