aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-06-11 07:57:28 -0500
committerThomas Graves <tgraves@apache.org>2014-06-11 07:57:28 -0500
commit2a4225dd944441d3f735625bb6bae6fca8fd06ca (patch)
treec539c35c9eb73bc757f9e2d1b1213844e6dd5dd6 /yarn/alpha
parent6e11930310e3864790d0e30f0df7bf691cbeb85d (diff)
downloadspark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.tar.gz
spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.tar.bz2
spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.zip
SPARK-1639. Tidy up some Spark on YARN code
This contains a bunch of small tidyings of the Spark on YARN code. I focused on the yarn stable code. @tgravescs, let me know if you'd like me to make these for the alpha code as well. Author: Sandy Ryza <sandy@cloudera.com> Closes #561 from sryza/sandy-spark-1639 and squashes the following commits: 72b6a02 [Sandy Ryza] Fix comment and set name on driver thread c2190b2 [Sandy Ryza] SPARK-1639. Tidy up some Spark on YARN code
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala16
1 files changed, 13 insertions, 3 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8f0ecb8557..1cc9c33cd2 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -277,7 +277,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
- Thread.sleep(100)
+ Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
@@ -416,6 +416,7 @@ object ApplicationMaster {
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+ private val ALLOCATE_HEARTBEAT_INTERVAL = 100
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
@@ -467,13 +468,22 @@ object ApplicationMaster {
})
}
- // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+ modified
+ }
+
+
+ /**
+ * Returns when we've either
+ * 1) received all the requested executors,
+ * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
+ * 3) hit an error that causes us to terminate trying to get containers.
+ */
+ def waitForInitialAllocations() {
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}
- modified
}
def main(argStrings: Array[String]) {