diff options
Diffstat (limited to 'yarn')
6 files changed, 32 insertions, 18 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8226207de4..e98287ca23 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] // Memory for the ApplicationMaster. - capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + capability.setMemory(args.amMemory + memoryOverhead) amContainer.setResource(capability) appContext.setQueue(args.amQueue) @@ -116,7 +116,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val minResMemory = newApp.getMinimumResourceCapability().getMemory() val amMemory = ((args.amMemory / minResMemory) * minResMemory) + ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - YarnAllocationHandler.MEMORY_OVERHEAD) + memoryOverhead) amMemory } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b6ecae1e65..bfdb6232f5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() + val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() // Compute number of threads for akka val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() if (minimumMemory > 0) { - val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 856391e52b..80e0162e9f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // Additional memory overhead - in mb. + private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor private val executorIdCounter = new AtomicInteger() @@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + memoryOverhead) } def allocateContainers(executorsToRequest: Int) { @@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler( val containerId = container.getId assert( container.getResource.getMemory >= - (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + (executorMemory + memoryOverhead)) if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler( if (numExecutors > 0) { logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, - executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + executorMemory + memoryOverhead)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) } @@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler( val rsrcRequest = Records.newRecord(classOf[ResourceRequest]) val memCapability = Records.newRecord(classOf[Resource]) // There probably is some overhead here, let's reserve a bit more memory. - memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memCapability.setMemory(executorMemory + memoryOverhead) rsrcRequest.setCapability(memCapability) val pri = Records.newRecord(classOf[Priority]) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index aeb3f0062d..1f543f7420 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -65,6 +65,10 @@ trait ClientBase extends Logging { val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) + // Additional memory overhead - in mb. + protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { Map( @@ -72,10 +76,10 @@ trait ClientBase extends Logging { "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", - (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" + - "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD), - (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" + - "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString) + (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" + + "greater than: " + memoryOverhead), + (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" + + "must be greater than: " + memoryOverhead.toString) ).foreach { case(cond, errStr) => if (cond) { logError(errStr) @@ -98,7 +102,7 @@ trait ClientBase extends Logging { format(args.executorMemory, maxMem)) System.exit(1) } - val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val amMem = args.amMemory + memoryOverhead if (amMem > maxMem) { logError("Required AM memory (%d) is above the max threshold (%d) of this cluster". format(args.amMemory, maxMem)) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1b6bfb42a5..ac46bd39ce 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // Memory for the ApplicationMaster. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] - memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memoryResource.setMemory(args.amMemory + memoryOverhead) appContext.setResource(memoryResource) // Finally, submit and monitor the application. @@ -118,7 +118,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() // var amMemory = ((args.amMemory / minResMemory) * minResMemory) + // ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - // YarnAllocationHandler.MEMORY_OVERHEAD) + // memoryOverhead ) args.amMemory } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index a979fe4d62..29ccec2adc 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -90,6 +90,10 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // Additional memory overhead - in mb. + private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. private val numPendingAllocate = new AtomicInteger() @@ -106,7 +110,7 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory + memoryOverhead) } def releaseContainer(container: Container) { @@ -248,7 +252,7 @@ private[yarn] class YarnAllocationHandler( val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + val executorMemoryOverhead = (executorMemory + memoryOverhead) assert(container.getResource.getMemory >= executorMemoryOverhead) if (numExecutorsRunningNow > maxExecutors) { @@ -477,7 +481,7 @@ private[yarn] class YarnAllocationHandler( numPendingAllocate.addAndGet(numExecutors) logInfo("Will Allocate %d executor containers, each with %d memory".format( numExecutors, - (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) + (executorMemory + memoryOverhead))) } else { logDebug("Empty allocation request ...") } @@ -537,7 +541,7 @@ private[yarn] class YarnAllocationHandler( priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val memoryRequest = executorMemory + memoryOverhead val resource = Resource.newInstance(memoryRequest, executorCores) val prioritySetting = Records.newRecord(classOf[Priority]) |