aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala39
1 files changed, 0 insertions, 39 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 062f946a9f..3ec36487dc 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkContext.getConf)
}
}
- } finally {
- // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
- // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
}
@@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
- ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
}
- } finally {
- // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
- // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All executors have launched.")
@@ -411,24 +402,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
object ApplicationMaster extends Logging {
- // Number of times to wait for the allocator loop to complete.
- // Each loop iteration waits for 100ms, so maximum of 3 seconds.
- // This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
- private val ALLOCATOR_LOOP_WAIT_COUNT = 30
private val ALLOCATE_HEARTBEAT_INTERVAL = 100
- def incrementAllocatorLoop(by: Int) {
- val count = yarnAllocatorLoop.getAndAdd(by)
- if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.synchronized {
- // to wake threads off wait ...
- yarnAllocatorLoop.notifyAll()
- }
- }
- }
-
private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
def register(master: ApplicationMaster) {
@@ -437,7 +414,6 @@ object ApplicationMaster extends Logging {
val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null /* initialValue */)
- val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
def sparkContextInitialized(sc: SparkContext): Boolean = {
var modified = false
@@ -472,21 +448,6 @@ object ApplicationMaster extends Logging {
modified
}
-
- /**
- * Returns when we've either
- * 1) received all the requested executors,
- * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
- * 3) hit an error that causes us to terminate trying to get containers.
- */
- def waitForInitialAllocations() {
- yarnAllocatorLoop.synchronized {
- while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.wait(1000L)
- }
- }
- }
-
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new ApplicationMasterArguments(argStrings)