diff options
author | Thomas Graves <tgraves@apache.org> | 2014-09-12 20:31:11 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-09-12 20:31:11 -0500 |
commit | 25311c2c545a60eb9dcf704814d4600987852155 (patch) | |
tree | e9ceb19e1a3281a4771c68c5861cb4e6b7ee7fc5 /yarn | |
parent | af2583826c15d2a4e2732017ea20feeff0fb79f6 (diff) | |
download | spark-25311c2c545a60eb9dcf704814d4600987852155.tar.gz spark-25311c2c545a60eb9dcf704814d4600987852155.tar.bz2 spark-25311c2c545a60eb9dcf704814d4600987852155.zip |
[SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Author: Thomas Graves <tgraves@apache.org>
Closes #2373 from tgravescs/SPARK-3456 and squashes the following commits:
77e9532 [Thomas Graves] [SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Diffstat (limited to 'yarn')
3 files changed, 14 insertions, 8 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5a1b42c1e1..6c93d85823 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -48,16 +48,17 @@ private[yarn] class YarnAllocationHandler( private val lastResponseId = new AtomicInteger() private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList() - override protected def allocateContainers(count: Int): YarnAllocateResponse = { + override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = { var resourceRequests: List[ResourceRequest] = null - logDebug("numExecutors: " + count) + logDebug("asking for additional executors: " + count + " with already pending: " + pending) + val totalNumAsk = count + pending if (count <= 0) { resourceRequests = List() } else if (preferredHostToCount.isEmpty) { logDebug("host preferences is empty") resourceRequests = List(createResourceRequest( - AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)) + AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)) } else { // request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. @@ -80,7 +81,7 @@ private[yarn] class YarnAllocationHandler( val anyContainerRequests: ResourceRequest = createResourceRequest( AllocationType.ANY, resource = null, - count, + totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest]( @@ -103,7 +104,7 @@ private[yarn] class YarnAllocationHandler( req.addAllReleases(releasedContainerList) if (count > 0) { - logInfo("Allocating %d executor containers with %d of memory each.".format(count, + logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk, executorMemory + memoryOverhead)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0b8744f4b8..299e38a5eb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -112,6 +112,9 @@ private[yarn] abstract class YarnAllocator( def allocateResources() = { val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get() + // this is needed by alpha, do it here since we add numPending right after this + val executorsPending = numPendingAllocate.get() + if (missing > 0) { numPendingAllocate.addAndGet(missing) logInfo("Will Allocate %d executor containers, each with %d memory".format( @@ -121,7 +124,7 @@ private[yarn] abstract class YarnAllocator( logDebug("Empty allocation request ...") } - val allocateResponse = allocateContainers(missing) + val allocateResponse = allocateContainers(missing, executorsPending) val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { @@ -435,9 +438,10 @@ private[yarn] abstract class YarnAllocator( * * @param count Number of containers to allocate. * If zero, should still contact RM (as a heartbeat). + * @param pending Number of containers pending allocate. Only used on alpha. * @return Response to the allocation request. */ - protected def allocateContainers(count: Int): YarnAllocateResponse + protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse /** Called to release a previously allocated container. */ protected def releaseContainer(container: Container): Unit diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5438f151ac..e44a8db41b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -47,7 +47,8 @@ private[yarn] class YarnAllocationHandler( amClient.releaseAssignedContainer(container.getId()) } - override protected def allocateContainers(count: Int): YarnAllocateResponse = { + // pending isn't used on stable as the AMRMClient handles incremental asks + override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = { addResourceRequests(count) // We have already set the container request. Poll the ResourceManager for a response. |