From f89cf65d7aced0bb387c05586f9f51cb29865022 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 21 Jul 2014 13:15:46 -0500 Subject: SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler Author: Sandy Ryza Closes #634 from sryza/sandy-spark-1707 and squashes the following commits: 2f6e358 [Sandy Ryza] Default min registered executors ratio to .8 for YARN 354c630 [Sandy Ryza] Remove outdated comments c744ef3 [Sandy Ryza] Take out waitForInitialAllocations 2a4329b [Sandy Ryza] SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler --- .../spark/deploy/yarn/ApplicationMaster.scala | 39 ---------------------- 1 file changed, 39 deletions(-) (limited to 'yarn/alpha') diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 062f946a9f..3ec36487dc 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkContext.getConf) } } - } finally { - // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT : - // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks - ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } } @@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } yarnAllocator.allocateContainers( math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) - ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) } - } finally { - // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, - // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. - ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo("All executors have launched.") @@ -411,24 +402,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } object ApplicationMaster extends Logging { - // Number of times to wait for the allocator loop to complete. - // Each loop iteration waits for 100ms, so maximum of 3 seconds. - // This is to ensure that we have reasonable number of containers before we start // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. - private val ALLOCATOR_LOOP_WAIT_COUNT = 30 private val ALLOCATE_HEARTBEAT_INTERVAL = 100 - def incrementAllocatorLoop(by: Int) { - val count = yarnAllocatorLoop.getAndAdd(by) - if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { - yarnAllocatorLoop.synchronized { - // to wake threads off wait ... - yarnAllocatorLoop.notifyAll() - } - } - } - private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]() def register(master: ApplicationMaster) { @@ -437,7 +414,6 @@ object ApplicationMaster extends Logging { val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null /* initialValue */) - val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0) def sparkContextInitialized(sc: SparkContext): Boolean = { var modified = false @@ -472,21 +448,6 @@ object ApplicationMaster extends Logging { modified } - - /** - * Returns when we've either - * 1) received all the requested executors, - * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms, - * 3) hit an error that causes us to terminate trying to get containers. - */ - def waitForInitialAllocations() { - yarnAllocatorLoop.synchronized { - while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { - yarnAllocatorLoop.wait(1000L) - } - } - } - def main(argStrings: Array[String]) { SignalLogger.register(log) val args = new ApplicationMasterArguments(argStrings) -- cgit v1.2.3