aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala4
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala12
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala14
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala12
6 files changed, 32 insertions, 18 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8226207de4..e98287ca23 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
// Memory for the ApplicationMaster.
- capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ capability.setMemory(args.amMemory + memoryOverhead)
amContainer.setResource(capability)
appContext.setQueue(args.amQueue)
@@ -116,7 +116,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val minResMemory = newApp.getMinimumResourceCapability().getMemory()
val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
- YarnAllocationHandler.MEMORY_OVERHEAD)
+ memoryOverhead)
amMemory
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b6ecae1e65..bfdb6232f5 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
+
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Compute number of threads for akka
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
- val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 856391e52b..80e0162e9f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
+ // Additional memory overhead - in mb.
+ private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
+
private val numExecutorsRunning = new AtomicInteger()
// Used to generate a unique id per executor
private val executorIdCounter = new AtomicInteger()
@@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + memoryOverhead)
}
def allocateContainers(executorsToRequest: Int) {
@@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler(
val containerId = container.getId
assert( container.getResource.getMemory >=
- (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ (executorMemory + memoryOverhead))
if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
@@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler(
if (numExecutors > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
- executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ executorMemory + memoryOverhead))
} else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}
@@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler(
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
val memCapability = Records.newRecord(classOf[Resource])
// There probably is some overhead here, let's reserve a bit more memory.
- memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ memCapability.setMemory(executorMemory + memoryOverhead)
rsrcRequest.setCapability(memCapability)
val pri = Records.newRecord(classOf[Priority])
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index aeb3f0062d..1f543f7420 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -65,6 +65,10 @@ trait ClientBase extends Logging {
val APP_FILE_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
+ // Additional memory overhead - in mb.
+ protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
+
// TODO(harvey): This could just go in ClientArguments.
def validateArgs() = {
Map(
@@ -72,10 +76,10 @@ trait ClientBase extends Logging {
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
- (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
- "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
- (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
- "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
+ (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
+ "greater than: " + memoryOverhead),
+ (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" +
+ "must be greater than: " + memoryOverhead.toString)
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
@@ -98,7 +102,7 @@ trait ClientBase extends Logging {
format(args.executorMemory, maxMem))
System.exit(1)
}
- val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val amMem = args.amMemory + memoryOverhead
if (amMem > maxMem) {
logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
format(args.amMemory, maxMem))
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1b6bfb42a5..ac46bd39ce 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
// Memory for the ApplicationMaster.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ memoryResource.setMemory(args.amMemory + memoryOverhead)
appContext.setResource(memoryResource)
// Finally, submit and monitor the application.
@@ -118,7 +118,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
// val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
// var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
// ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
- // YarnAllocationHandler.MEMORY_OVERHEAD)
+ // memoryOverhead )
args.amMemory
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a979fe4d62..29ccec2adc 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -90,6 +90,10 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
+ // Additional memory overhead - in mb.
+ private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
+
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
@@ -106,7 +110,7 @@ private[yarn] class YarnAllocationHandler(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ container.getResource.getMemory >= (executorMemory + memoryOverhead)
}
def releaseContainer(container: Container) {
@@ -248,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
- val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ val executorMemoryOverhead = (executorMemory + memoryOverhead)
assert(container.getResource.getMemory >= executorMemoryOverhead)
if (numExecutorsRunningNow > maxExecutors) {
@@ -477,7 +481,7 @@ private[yarn] class YarnAllocationHandler(
numPendingAllocate.addAndGet(numExecutors)
logInfo("Will Allocate %d executor containers, each with %d memory".format(
numExecutors,
- (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+ (executorMemory + memoryOverhead)))
} else {
logDebug("Empty allocation request ...")
}
@@ -537,7 +541,7 @@ private[yarn] class YarnAllocationHandler(
priority: Int
): ArrayBuffer[ContainerRequest] = {
- val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val memoryRequest = executorMemory + memoryOverhead
val resource = Resource.newInstance(memoryRequest, executorCores)
val prioritySetting = Records.newRecord(classOf[Priority])