aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-09-12 20:31:11 -0500
committerThomas Graves <tgraves@apache.org>2014-09-12 20:31:11 -0500
commit25311c2c545a60eb9dcf704814d4600987852155 (patch)
treee9ceb19e1a3281a4771c68c5861cb4e6b7ee7fc5 /yarn/alpha
parentaf2583826c15d2a4e2732017ea20feeff0fb79f6 (diff)
downloadspark-25311c2c545a60eb9dcf704814d4600987852155.tar.gz
spark-25311c2c545a60eb9dcf704814d4600987852155.tar.bz2
spark-25311c2c545a60eb9dcf704814d4600987852155.zip
[SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Author: Thomas Graves <tgraves@apache.org> Closes #2373 from tgravescs/SPARK-3456 and squashes the following commits: 77e9532 [Thomas Graves] [SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala11
1 files changed, 6 insertions, 5 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 5a1b42c1e1..6c93d85823 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -48,16 +48,17 @@ private[yarn] class YarnAllocationHandler(
private val lastResponseId = new AtomicInteger()
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
- override protected def allocateContainers(count: Int): YarnAllocateResponse = {
+ override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
var resourceRequests: List[ResourceRequest] = null
- logDebug("numExecutors: " + count)
+ logDebug("asking for additional executors: " + count + " with already pending: " + pending)
+ val totalNumAsk = count + pending
if (count <= 0) {
resourceRequests = List()
} else if (preferredHostToCount.isEmpty) {
logDebug("host preferences is empty")
resourceRequests = List(createResourceRequest(
- AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
+ AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
@@ -80,7 +81,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests: ResourceRequest = createResourceRequest(
AllocationType.ANY,
resource = null,
- count,
+ totalNumAsk,
YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -103,7 +104,7 @@ private[yarn] class YarnAllocationHandler(
req.addAllReleases(releasedContainerList)
if (count > 0) {
- logInfo("Allocating %d executor containers with %d of memory each.".format(count,
+ logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk,
executorMemory + memoryOverhead))
} else {
logDebug("Empty allocation req .. release : " + releasedContainerList)