diff options
author | Thomas Graves <tgraves@apache.org> | 2014-09-12 20:31:11 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-09-12 20:31:11 -0500 |
commit | 25311c2c545a60eb9dcf704814d4600987852155 (patch) | |
tree | e9ceb19e1a3281a4771c68c5861cb4e6b7ee7fc5 /yarn/alpha | |
parent | af2583826c15d2a4e2732017ea20feeff0fb79f6 (diff) | |
download | spark-25311c2c545a60eb9dcf704814d4600987852155.tar.gz spark-25311c2c545a60eb9dcf704814d4600987852155.tar.bz2 spark-25311c2c545a60eb9dcf704814d4600987852155.zip |
[SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Author: Thomas Graves <tgraves@apache.org>
Closes #2373 from tgravescs/SPARK-3456 and squashes the following commits:
77e9532 [Thomas Graves] [SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Diffstat (limited to 'yarn/alpha')
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5a1b42c1e1..6c93d85823 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -48,16 +48,17 @@ private[yarn] class YarnAllocationHandler( private val lastResponseId = new AtomicInteger() private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList() - override protected def allocateContainers(count: Int): YarnAllocateResponse = { + override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = { var resourceRequests: List[ResourceRequest] = null - logDebug("numExecutors: " + count) + logDebug("asking for additional executors: " + count + " with already pending: " + pending) + val totalNumAsk = count + pending if (count <= 0) { resourceRequests = List() } else if (preferredHostToCount.isEmpty) { logDebug("host preferences is empty") resourceRequests = List(createResourceRequest( - AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)) + AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)) } else { // request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. @@ -80,7 +81,7 @@ private[yarn] class YarnAllocationHandler( val anyContainerRequests: ResourceRequest = createResourceRequest( AllocationType.ANY, resource = null, - count, + totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest]( @@ -103,7 +104,7 @@ private[yarn] class YarnAllocationHandler( req.addAllReleases(releasedContainerList) if (count > 0) { - logInfo("Allocating %d executor containers with %d of memory each.".format(count, + logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk, executorMemory + memoryOverhead)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) |