diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-06-11 07:57:28 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-06-11 07:57:28 -0500 |
commit | 2a4225dd944441d3f735625bb6bae6fca8fd06ca (patch) | |
tree | c539c35c9eb73bc757f9e2d1b1213844e6dd5dd6 /yarn/alpha | |
parent | 6e11930310e3864790d0e30f0df7bf691cbeb85d (diff) | |
download | spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.tar.gz spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.tar.bz2 spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.zip |
SPARK-1639. Tidy up some Spark on YARN code
This contains a bunch of small tidyings of the Spark on YARN code.
I focused on the yarn stable code. @tgravescs, let me know if you'd like me to make these for the alpha code as well.
Author: Sandy Ryza <sandy@cloudera.com>
Closes #561 from sryza/sandy-spark-1639 and squashes the following commits:
72b6a02 [Sandy Ryza] Fix comment and set name on driver thread
c2190b2 [Sandy Ryza] SPARK-1639. Tidy up some Spark on YARN code
Diffstat (limited to 'yarn/alpha')
-rw-r--r-- | yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 16 |
1 files changed, 13 insertions, 3 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8f0ecb8557..1cc9c33cd2 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -277,7 +277,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, yarnAllocator.allocateContainers( math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) ApplicationMaster.incrementAllocatorLoop(1) - Thread.sleep(100) + Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, @@ -416,6 +416,7 @@ object ApplicationMaster { // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. private val ALLOCATOR_LOOP_WAIT_COUNT = 30 + private val ALLOCATE_HEARTBEAT_INTERVAL = 100 def incrementAllocatorLoop(by: Int) { val count = yarnAllocatorLoop.getAndAdd(by) @@ -467,13 +468,22 @@ object ApplicationMaster { }) } - // Wait for initialization to complete and atleast 'some' nodes can get allocated. + modified + } + + + /** + * Returns when we've either + * 1) received all the requested executors, + * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms, + * 3) hit an error that causes us to terminate trying to get containers. + */ + def waitForInitialAllocations() { yarnAllocatorLoop.synchronized { while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { yarnAllocatorLoop.wait(1000L) } } - modified } def main(argStrings: Array[String]) { |