aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-07-21 13:15:46 -0500
committerThomas Graves <tgraves@apache.org>2014-07-21 13:15:46 -0500
commitf89cf65d7aced0bb387c05586f9f51cb29865022 (patch)
treeabf25e4025ad61c86bb04814abadc7710b3b13eb /yarn
parentcd273a238144a9a436219cd01250369586f5638b (diff)
downloadspark-f89cf65d7aced0bb387c05586f9f51cb29865022.tar.gz
spark-f89cf65d7aced0bb387c05586f9f51cb29865022.tar.bz2
spark-f89cf65d7aced0bb387c05586f9f51cb29865022.zip
SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler
Author: Sandy Ryza <sandy@cloudera.com> Closes #634 from sryza/sandy-spark-1707 and squashes the following commits: 2f6e358 [Sandy Ryza] Default min registered executors ratio to .8 for YARN 354c630 [Sandy Ryza] Remove outdated comments c744ef3 [Sandy Ryza] Take out waitForInitialAllocations 2a4329b [Sandy Ryza] SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala39
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala10
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala5
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala8
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala5
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala43
6 files changed, 11 insertions, 99 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 062f946a9f..3ec36487dc 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkContext.getConf)
}
}
- } finally {
- // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
- // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
}
@@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
- ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
}
- } finally {
- // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
- // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All executors have launched.")
@@ -411,24 +402,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
object ApplicationMaster extends Logging {
- // Number of times to wait for the allocator loop to complete.
- // Each loop iteration waits for 100ms, so maximum of 3 seconds.
- // This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
- private val ALLOCATOR_LOOP_WAIT_COUNT = 30
private val ALLOCATE_HEARTBEAT_INTERVAL = 100
- def incrementAllocatorLoop(by: Int) {
- val count = yarnAllocatorLoop.getAndAdd(by)
- if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.synchronized {
- // to wake threads off wait ...
- yarnAllocatorLoop.notifyAll()
- }
- }
- }
-
private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
def register(master: ApplicationMaster) {
@@ -437,7 +414,6 @@ object ApplicationMaster extends Logging {
val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null /* initialValue */)
- val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
def sparkContextInitialized(sc: SparkContext): Boolean = {
var modified = false
@@ -472,21 +448,6 @@ object ApplicationMaster extends Logging {
modified
}
-
- /**
- * Returns when we've either
- * 1) received all the requested executors,
- * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
- * 3) hit an error that causes us to terminate trying to get containers.
- */
- def waitForInitialAllocations() {
- yarnAllocatorLoop.synchronized {
- while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.wait(1000L)
- }
- }
- }
-
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new ApplicationMasterArguments(argStrings)
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 15e8c21aa5..3474112ded 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -37,14 +37,4 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
val retval = YarnAllocationHandler.lookupRack(conf, host)
if (retval != null) Some(retval) else None
}
-
- override def postStartHook() {
-
- super.postStartHook()
- // The yarn application is running, but the executor might not yet ready
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- // TODO It needn't after waitBackendReady
- Thread.sleep(2000L)
- logInfo("YarnClientClusterScheduler.postStartHook done")
- }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 1b37c4bb13..d8266f7b0c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -30,6 +30,11 @@ private[spark] class YarnClientSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
+ if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+ minRegisteredRatio = 0.8
+ ready = false
+ }
+
var client: Client = null
var appId: ApplicationId = null
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 9ee53d797c..9aeca4a637 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -47,14 +47,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
}
override def postStartHook() {
- val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
+ ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
- if (sparkContextInitialized){
- ApplicationMaster.waitForInitialAllocations()
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- // TODO It needn't after waitBackendReady
- Thread.sleep(3000L)
- }
logInfo("YarnClusterScheduler.postStartHook done")
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index a04b08f43c..0ad1794d19 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -27,6 +27,11 @@ private[spark] class YarnClusterSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+ if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+ minRegisteredRatio = 0.8
+ ready = false
+ }
+
override def start() {
super.start()
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1a24ec759b..eaf594c8b4 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -234,10 +234,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkContext.getConf)
}
}
- } finally {
- // In case of exceptions, etc - ensure that the loop in
- // ApplicationMaster#sparkContextInitialized() breaks.
- ApplicationMaster.doneWithInitialAllocations()
}
}
@@ -254,16 +250,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
- if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) {
- ApplicationMaster.doneWithInitialAllocations()
- }
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
iters += 1
}
- } finally {
- // In case of exceptions, etc - ensure that the loop in
- // ApplicationMaster#sparkContextInitialized() breaks.
- ApplicationMaster.doneWithInitialAllocations()
}
logInfo("All executors have launched.")
}
@@ -365,12 +354,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
object ApplicationMaster extends Logging {
- // Number of times to wait for the allocator loop to complete.
- // Each loop iteration waits for 100ms, so maximum of 3 seconds.
- // This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
- private val ALLOCATOR_LOOP_WAIT_COUNT = 30
private val ALLOCATE_HEARTBEAT_INTERVAL = 100
private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
@@ -378,20 +363,6 @@ object ApplicationMaster extends Logging {
val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)
- // Variable used to notify the YarnClusterScheduler that it should stop waiting
- // for the initial set of executors to be started and get on with its business.
- val doneWithInitialAllocationsMonitor = new Object()
-
- @volatile var isDoneWithInitialAllocations = false
-
- def doneWithInitialAllocations() {
- isDoneWithInitialAllocations = true
- doneWithInitialAllocationsMonitor.synchronized {
- // to wake threads off wait ...
- doneWithInitialAllocationsMonitor.notifyAll()
- }
- }
-
def register(master: ApplicationMaster) {
applicationMasters.add(master)
}
@@ -434,20 +405,6 @@ object ApplicationMaster extends Logging {
modified
}
- /**
- * Returns when we've either
- * 1) received all the requested executors,
- * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
- * 3) hit an error that causes us to terminate trying to get containers.
- */
- def waitForInitialAllocations() {
- doneWithInitialAllocationsMonitor.synchronized {
- while (!isDoneWithInitialAllocations) {
- doneWithInitialAllocationsMonitor.wait(1000L)
- }
- }
- }
-
def getApplicationAttemptId(): ApplicationAttemptId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
val containerId = ConverterUtils.toContainerId(containerIdString)