diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-07-21 13:15:46 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-07-21 13:15:46 -0500 |
commit | f89cf65d7aced0bb387c05586f9f51cb29865022 (patch) | |
tree | abf25e4025ad61c86bb04814abadc7710b3b13eb | |
parent | cd273a238144a9a436219cd01250369586f5638b (diff) | |
download | spark-f89cf65d7aced0bb387c05586f9f51cb29865022.tar.gz spark-f89cf65d7aced0bb387c05586f9f51cb29865022.tar.bz2 spark-f89cf65d7aced0bb387c05586f9f51cb29865022.zip |
SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler
Author: Sandy Ryza <sandy@cloudera.com>
Closes #634 from sryza/sandy-spark-1707 and squashes the following commits:
2f6e358 [Sandy Ryza] Default min registered executors ratio to .8 for YARN
354c630 [Sandy Ryza] Remove outdated comments
c744ef3 [Sandy Ryza] Take out waitForInitialAllocations
2a4329b [Sandy Ryza] SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler
6 files changed, 11 insertions, 99 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 062f946a9f..3ec36487dc 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkContext.getConf) } } - } finally { - // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT : - // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks - ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } } @@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } yarnAllocator.allocateContainers( math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) - ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) } - } finally { - // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, - // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. - ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo("All executors have launched.") @@ -411,24 +402,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } object ApplicationMaster extends Logging { - // Number of times to wait for the allocator loop to complete. - // Each loop iteration waits for 100ms, so maximum of 3 seconds. - // This is to ensure that we have reasonable number of containers before we start // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. - private val ALLOCATOR_LOOP_WAIT_COUNT = 30 private val ALLOCATE_HEARTBEAT_INTERVAL = 100 - def incrementAllocatorLoop(by: Int) { - val count = yarnAllocatorLoop.getAndAdd(by) - if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { - yarnAllocatorLoop.synchronized { - // to wake threads off wait ... - yarnAllocatorLoop.notifyAll() - } - } - } - private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]() def register(master: ApplicationMaster) { @@ -437,7 +414,6 @@ object ApplicationMaster extends Logging { val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null /* initialValue */) - val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0) def sparkContextInitialized(sc: SparkContext): Boolean = { var modified = false @@ -472,21 +448,6 @@ object ApplicationMaster extends Logging { modified } - - /** - * Returns when we've either - * 1) received all the requested executors, - * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms, - * 3) hit an error that causes us to terminate trying to get containers. - */ - def waitForInitialAllocations() { - yarnAllocatorLoop.synchronized { - while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { - yarnAllocatorLoop.wait(1000L) - } - } - } - def main(argStrings: Array[String]) { SignalLogger.register(log) val args = new ApplicationMasterArguments(argStrings) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 15e8c21aa5..3474112ded 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -37,14 +37,4 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur val retval = YarnAllocationHandler.lookupRack(conf, host) if (retval != null) Some(retval) else None } - - override def postStartHook() { - - super.postStartHook() - // The yarn application is running, but the executor might not yet ready - // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt - // TODO It needn't after waitBackendReady - Thread.sleep(2000L) - logInfo("YarnClientClusterScheduler.postStartHook done") - } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 1b37c4bb13..d8266f7b0c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -30,6 +30,11 @@ private[spark] class YarnClientSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { + if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { + minRegisteredRatio = 0.8 + ready = false + } + var client: Client = null var appId: ApplicationId = null diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 9ee53d797c..9aeca4a637 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -47,14 +47,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) } override def postStartHook() { - val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc) + ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() - if (sparkContextInitialized){ - ApplicationMaster.waitForInitialAllocations() - // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt - // TODO It needn't after waitBackendReady - Thread.sleep(3000L) - } logInfo("YarnClusterScheduler.postStartHook done") } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index a04b08f43c..0ad1794d19 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -27,6 +27,11 @@ private[spark] class YarnClusterSchedulerBackend( sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { + minRegisteredRatio = 0.8 + ready = false + } + override def start() { super.start() var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1a24ec759b..eaf594c8b4 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -234,10 +234,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkContext.getConf) } } - } finally { - // In case of exceptions, etc - ensure that the loop in - // ApplicationMaster#sparkContextInitialized() breaks. - ApplicationMaster.doneWithInitialAllocations() } } @@ -254,16 +250,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, checkNumExecutorsFailed() allocateMissingExecutor() yarnAllocator.allocateResources() - if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) { - ApplicationMaster.doneWithInitialAllocations() - } Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) iters += 1 } - } finally { - // In case of exceptions, etc - ensure that the loop in - // ApplicationMaster#sparkContextInitialized() breaks. - ApplicationMaster.doneWithInitialAllocations() } logInfo("All executors have launched.") } @@ -365,12 +354,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } object ApplicationMaster extends Logging { - // Number of times to wait for the allocator loop to complete. - // Each loop iteration waits for 100ms, so maximum of 3 seconds. - // This is to ensure that we have reasonable number of containers before we start // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. - private val ALLOCATOR_LOOP_WAIT_COUNT = 30 private val ALLOCATE_HEARTBEAT_INTERVAL = 100 private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]() @@ -378,20 +363,6 @@ object ApplicationMaster extends Logging { val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) - // Variable used to notify the YarnClusterScheduler that it should stop waiting - // for the initial set of executors to be started and get on with its business. - val doneWithInitialAllocationsMonitor = new Object() - - @volatile var isDoneWithInitialAllocations = false - - def doneWithInitialAllocations() { - isDoneWithInitialAllocations = true - doneWithInitialAllocationsMonitor.synchronized { - // to wake threads off wait ... - doneWithInitialAllocationsMonitor.notifyAll() - } - } - def register(master: ApplicationMaster) { applicationMasters.add(master) } @@ -434,20 +405,6 @@ object ApplicationMaster extends Logging { modified } - /** - * Returns when we've either - * 1) received all the requested executors, - * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms, - * 3) hit an error that causes us to terminate trying to get containers. - */ - def waitForInitialAllocations() { - doneWithInitialAllocationsMonitor.synchronized { - while (!isDoneWithInitialAllocations) { - doneWithInitialAllocationsMonitor.wait(1000L) - } - } - } - def getApplicationAttemptId(): ApplicationAttemptId = { val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) val containerId = ConverterUtils.toContainerId(containerIdString) |