aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-06-05 12:28:37 -0700
committerSandy Ryza <sandy@cloudera.com>2015-06-05 12:28:37 -0700
commit3f80bc841ab155925fb0530eef5927990f4a5793 (patch)
tree9b70e8e9164f290f96283448df2c78566478fbd1 /core/src
parent0992a0a77d38081c6c206bb34333013125d85376 (diff)
downloadspark-3f80bc841ab155925fb0530eef5927990f4a5793.tar.gz
spark-3f80bc841ab155925fb0530eef5927990f4a5793.tar.bz2
spark-3f80bc841ab155925fb0530eef5927990f4a5793.zip
[SPARK-7699] [CORE] Lazy start the scheduler for dynamic allocation
This patch propose to lazy start the scheduler for dynamic allocation to avoid fast ramp down executor numbers is load is less. This implementation will: 1. immediately start the scheduler is `numExecutorsTarget` is 0, this is the expected behavior. 2. if `numExecutorsTarget` is not zero, start the scheduler until the number is satisfied, if the load is less, this initial started executors will last for at least 60 seconds, user will have a window to submit a job, no need to revamp the executors. 3. if `numExecutorsTarget` is not satisfied until the timeout, this means resource is not enough, the scheduler will start until this timeout, will not wait infinitely. Please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Closes #6430 from jerryshao/SPARK-7699 and squashes the following commits: 02cac8e [jerryshao] Address the comments 7242450 [jerryshao] Remove the useless import ecc0b00 [jerryshao] Address the comments 6f75f00 [jerryshao] Style changes 8b8decc [jerryshao] change the test name fb822ca [jerryshao] Change the solution according to comments 1cc74e5 [jerryshao] Lazy start the scheduler for dynamic allocation
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala90
2 files changed, 89 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index f7323a4d9d..9939103bb0 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -150,6 +150,13 @@ private[spark] class ExecutorAllocationManager(
// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
val executorAllocationManagerSource = new ExecutorAllocationManagerSource
+ // Whether we are still waiting for the initial set of executors to be allocated.
+ // While this is true, we will not cancel outstanding executor requests. This is
+ // set to false when:
+ // (1) a stage is submitted, or
+ // (2) an executor idle timeout has elapsed.
+ @volatile private var initializing: Boolean = true
+
/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
@@ -240,6 +247,7 @@ private[spark] class ExecutorAllocationManager(
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
+ initializing = false
removeExecutor(executorId)
}
!expired
@@ -261,7 +269,11 @@ private[spark] class ExecutorAllocationManager(
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded
- if (maxNeeded < numExecutorsTarget) {
+ if (initializing) {
+ // Do not change our target while we are still initializing,
+ // Otherwise the first job may have to ramp up unnecessarily
+ 0
+ } else if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
@@ -271,7 +283,7 @@ private[spark] class ExecutorAllocationManager(
// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
client.requestTotalExecutors(numExecutorsTarget)
- logInfo(s"Lowering target number of executors to $numExecutorsTarget (previously " +
+ logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
}
numExecutorsTarget - oldNumExecutorsTarget
@@ -481,6 +493,7 @@ private[spark] class ExecutorAllocationManager(
private var numRunningTasks: Int = _
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
+ initializing = false
val stageId = stageSubmitted.stageInfo.stageId
val numTasks = stageSubmitted.stageInfo.numTasks
allocationManager.synchronized {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 1c2b681f0b..803e1831bb 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -90,7 +90,7 @@ class ExecutorAllocationManagerSuite
}
test("add executors") {
- sc = createSparkContext(1, 10)
+ sc = createSparkContext(1, 10, 1)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
@@ -135,7 +135,7 @@ class ExecutorAllocationManagerSuite
}
test("add executors capped by num pending tasks") {
- sc = createSparkContext(0, 10)
+ sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
@@ -186,7 +186,7 @@ class ExecutorAllocationManagerSuite
}
test("cancel pending executors when no longer needed") {
- sc = createSparkContext(0, 10)
+ sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
@@ -213,7 +213,7 @@ class ExecutorAllocationManagerSuite
}
test("remove executors") {
- sc = createSparkContext(5, 10)
+ sc = createSparkContext(5, 10, 5)
val manager = sc.executorAllocationManager.get
(1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
@@ -263,7 +263,7 @@ class ExecutorAllocationManagerSuite
}
test ("interleaving add and remove") {
- sc = createSparkContext(5, 10)
+ sc = createSparkContext(5, 10, 5)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
@@ -331,7 +331,7 @@ class ExecutorAllocationManagerSuite
}
test("starting/canceling add timer") {
- sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10, 2)
val clock = new ManualClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -363,7 +363,7 @@ class ExecutorAllocationManagerSuite
}
test("starting/canceling remove timers") {
- sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10, 2)
val clock = new ManualClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -410,7 +410,7 @@ class ExecutorAllocationManagerSuite
}
test("mock polling loop with no events") {
- sc = createSparkContext(0, 20)
+ sc = createSparkContext(0, 20, 0)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(2020L)
manager.setClock(clock)
@@ -436,7 +436,7 @@ class ExecutorAllocationManagerSuite
}
test("mock polling loop add behavior") {
- sc = createSparkContext(0, 20)
+ sc = createSparkContext(0, 20, 0)
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -486,7 +486,7 @@ class ExecutorAllocationManagerSuite
}
test("mock polling loop remove behavior") {
- sc = createSparkContext(1, 20)
+ sc = createSparkContext(1, 20, 1)
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -547,7 +547,7 @@ class ExecutorAllocationManagerSuite
}
test("listeners trigger add executors correctly") {
- sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(addTime(manager) === NOT_SET)
@@ -577,7 +577,7 @@ class ExecutorAllocationManagerSuite
}
test("listeners trigger remove executors correctly") {
- sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(removeTimes(manager).isEmpty)
@@ -608,7 +608,7 @@ class ExecutorAllocationManagerSuite
}
test("listeners trigger add and remove executor callbacks correctly") {
- sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
@@ -641,7 +641,7 @@ class ExecutorAllocationManagerSuite
}
test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
- sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
@@ -677,7 +677,7 @@ class ExecutorAllocationManagerSuite
}
test("avoid ramp up when target < running executors") {
- sc = createSparkContext(0, 100000)
+ sc = createSparkContext(0, 100000, 0)
val manager = sc.executorAllocationManager.get
val stage1 = createStageInfo(0, 1000)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
@@ -701,13 +701,67 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 16)
}
- private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
+ test("avoid ramp down initial executors until first job is submitted") {
+ sc = createSparkContext(2, 5, 3)
+ val manager = sc.executorAllocationManager.get
+ val clock = new ManualClock(10000L)
+ manager.setClock(clock)
+
+ // Verify the initial number of executors
+ assert(numExecutorsTarget(manager) === 3)
+ schedule(manager)
+ // Verify whether the initial number of executors is kept with no pending tasks
+ assert(numExecutorsTarget(manager) === 3)
+
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+ clock.advance(100L)
+
+ assert(maxNumExecutorsNeeded(manager) === 2)
+ schedule(manager)
+
+ // Verify that current number of executors should be ramp down when first job is submitted
+ assert(numExecutorsTarget(manager) === 2)
+ }
+
+ test("avoid ramp down initial executors until idle executor is timeout") {
+ sc = createSparkContext(2, 5, 3)
+ val manager = sc.executorAllocationManager.get
+ val clock = new ManualClock(10000L)
+ manager.setClock(clock)
+
+ // Verify the initial number of executors
+ assert(numExecutorsTarget(manager) === 3)
+ schedule(manager)
+ // Verify the initial number of executors is kept when no pending tasks
+ assert(numExecutorsTarget(manager) === 3)
+ (0 until 3).foreach { i =>
+ onExecutorAdded(manager, s"executor-$i")
+ }
+
+ clock.advance(executorIdleTimeout * 1000)
+
+ assert(maxNumExecutorsNeeded(manager) === 0)
+ schedule(manager)
+ // Verify executor is timeout but numExecutorsTarget is not recalculated
+ assert(numExecutorsTarget(manager) === 3)
+
+ // Schedule again to recalculate the numExecutorsTarget after executor is timeout
+ schedule(manager)
+ // Verify that current number of executors should be ramp down when executor is timeout
+ assert(numExecutorsTarget(manager) === 2)
+ }
+
+ private def createSparkContext(
+ minExecutors: Int = 1,
+ maxExecutors: Int = 5,
+ initialExecutors: Int = 1): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
+ .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
.set("spark.dynamicAllocation.schedulerBacklogTimeout",
s"${schedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
@@ -791,6 +845,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _schedule()
}
+ private def maxNumExecutorsNeeded(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _maxNumExecutorsNeeded()
+ }
+
private def addExecutors(manager: ExecutorAllocationManager): Int = {
val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
manager invokePrivate _addExecutors(maxNumExecutorsNeeded)