aboutsummaryrefslogtreecommitdiff
path: root/yarn/common
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-09-12 20:31:11 -0500
committerThomas Graves <tgraves@apache.org>2014-09-12 20:31:11 -0500
commit25311c2c545a60eb9dcf704814d4600987852155 (patch)
treee9ceb19e1a3281a4771c68c5861cb4e6b7ee7fc5 /yarn/common
parentaf2583826c15d2a4e2732017ea20feeff0fb79f6 (diff)
downloadspark-25311c2c545a60eb9dcf704814d4600987852155.tar.gz
spark-25311c2c545a60eb9dcf704814d4600987852155.tar.bz2
spark-25311c2c545a60eb9dcf704814d4600987852155.zip
[SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Author: Thomas Graves <tgraves@apache.org> Closes #2373 from tgravescs/SPARK-3456 and squashes the following commits: 77e9532 [Thomas Graves] [SPARK-3456] YarnAllocator on alpha can lose container requests to RM
Diffstat (limited to 'yarn/common')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala8
1 files changed, 6 insertions, 2 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 0b8744f4b8..299e38a5eb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -112,6 +112,9 @@ private[yarn] abstract class YarnAllocator(
def allocateResources() = {
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
+ // this is needed by alpha, do it here since we add numPending right after this
+ val executorsPending = numPendingAllocate.get()
+
if (missing > 0) {
numPendingAllocate.addAndGet(missing)
logInfo("Will Allocate %d executor containers, each with %d memory".format(
@@ -121,7 +124,7 @@ private[yarn] abstract class YarnAllocator(
logDebug("Empty allocation request ...")
}
- val allocateResponse = allocateContainers(missing)
+ val allocateResponse = allocateContainers(missing, executorsPending)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
@@ -435,9 +438,10 @@ private[yarn] abstract class YarnAllocator(
*
* @param count Number of containers to allocate.
* If zero, should still contact RM (as a heartbeat).
+ * @param pending Number of containers pending allocate. Only used on alpha.
* @return Response to the allocation request.
*/
- protected def allocateContainers(count: Int): YarnAllocateResponse
+ protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
/** Called to release a previously allocated container. */
protected def releaseContainer(container: Container): Unit