aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-11-14 15:51:05 -0800
committerAndrew Or <andrew@databricks.com>2014-11-14 15:51:40 -0800
commitad42b283246b93654c5fd731cd618fee74d8c4da (patch)
treeafbfda5380f5e8ba9e3cb199041bbf63b57076b0 /core
parent37482ce5a7b875f17d32a5e8c561cc8e9772c9b3 (diff)
downloadspark-ad42b283246b93654c5fd731cd618fee74d8c4da.tar.gz
spark-ad42b283246b93654c5fd731cd618fee74d8c4da.tar.bz2
spark-ad42b283246b93654c5fd731cd618fee74d8c4da.zip
SPARK-4214. With dynamic allocation, avoid outstanding requests for more...
... executors than pending tasks need. WIP. Still need to add and fix tests. Author: Sandy Ryza <sandy@cloudera.com> Closes #3204 from sryza/sandy-spark-4214 and squashes the following commits: 35cf0e0 [Sandy Ryza] Add comment 13b53df [Sandy Ryza] Review feedback 067465f [Sandy Ryza] Whitespace fix 6ae080c [Sandy Ryza] Add tests and get num pending tasks from ExecutorAllocationListener 531e2b6 [Sandy Ryza] SPARK-4214. With dynamic allocation, avoid outstanding requests for more executors than pending tasks need.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala55
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala48
2 files changed, 94 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index ef93009a07..88adb89299 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -28,7 +28,9 @@ import org.apache.spark.scheduler._
* the scheduler queue is not drained in N seconds, then new executors are added. If the queue
* persists for another M seconds, then more executors are added and so on. The number added
* in each round increases exponentially from the previous round until an upper bound on the
- * number of executors has been reached.
+ * number of executors has been reached. The upper bound is based both on a configured property
+ * and on the number of tasks pending: the policy will never increase the number of executor
+ * requests past the number needed to handle all pending tasks.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
+ // TODO: The default value of 1 for spark.executor.cores works right now because dynamic
+ // allocation is only supported for YARN and the default number of cores per executor in YARN is
+ // 1, but it might need to be attained differently for different cluster managers
+ private val tasksPerExecutor =
+ conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
+
validateSettings()
// Number of executors to add in the next round
@@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock
+ // Listener for Spark events that impact the allocation policy
+ private val listener = new ExecutorAllocationListener(this)
+
/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
@@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
+ if (tasksPerExecutor == 0) {
+ throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores")
+ }
}
/**
@@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
* Register for scheduler callbacks to decide when to add and remove executors.
*/
def start(): Unit = {
- val listener = new ExecutorAllocationListener(this)
sc.addSparkListener(listener)
startPolling()
}
@@ -218,13 +231,27 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
return 0
}
- // Request executors with respect to the upper bound
- val actualNumExecutorsToAdd =
- if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) {
- numExecutorsToAdd
- } else {
- maxNumExecutors - numExistingExecutors
- }
+ // The number of executors needed to satisfy all pending tasks is the number of tasks pending
+ // divided by the number of tasks each executor can fit, rounded up.
+ val maxNumExecutorsPending =
+ (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
+ if (numExecutorsPending >= maxNumExecutorsPending) {
+ logDebug(s"Not adding executors because there are already $numExecutorsPending " +
+ s"pending and pending tasks could only fill $maxNumExecutorsPending")
+ numExecutorsToAdd = 1
+ return 0
+ }
+
+ // It's never useful to request more executors than could satisfy all the pending tasks, so
+ // cap request at that amount.
+ // Also cap request with respect to the configured upper bound.
+ val maxNumExecutorsToAdd = math.min(
+ maxNumExecutorsPending - numExecutorsPending,
+ maxNumExecutors - numExistingExecutors)
+ assert(maxNumExecutorsToAdd > 0)
+
+ val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
+
val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
if (addRequestAcknowledged) {
@@ -445,6 +472,16 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
}
+
+ /**
+ * An estimate of the total number of pending tasks remaining for currently running stages. Does
+ * not account for tasks which may have failed and been resubmitted.
+ */
+ def totalPendingTasks(): Int = {
+ stageIdToNumTasks.map { case (stageId, numTasks) =>
+ numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
+ }.sum
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 66cf60d25f..4b27477790 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -76,6 +76,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("add executors") {
sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Keep adding until the limit is reached
assert(numExecutorsPending(manager) === 0)
@@ -117,6 +118,51 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(numExecutorsToAdd(manager) === 1)
}
+ test("add executors capped by num pending tasks") {
+ sc = createSparkContext(1, 10)
+ val manager = sc.executorAllocationManager.get
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
+
+ // Verify that we're capped at number of tasks in the stage
+ assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 2)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsPending(manager) === 3)
+ assert(numExecutorsToAdd(manager) === 4)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsPending(manager) === 5)
+ assert(numExecutorsToAdd(manager) === 1)
+
+ // Verify that running a task reduces the cap
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
+ sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsToAdd(manager) === 2)
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsToAdd(manager) === 1)
+
+ // Verify that re-running a task doesn't reduce the cap further
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3)))
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 8)
+ assert(numExecutorsToAdd(manager) === 2)
+ assert(addExecutors(manager) === 1)
+ assert(numExecutorsPending(manager) === 9)
+ assert(numExecutorsToAdd(manager) === 1)
+
+ // Verify that running a task once we're at our limit doesn't blow things up
+ sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
+ assert(addExecutors(manager) === 0)
+ assert(numExecutorsPending(manager) === 9)
+ }
+
test("remove executors") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
@@ -170,6 +216,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test ("interleaving add and remove") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Add a few executors
assert(addExecutors(manager) === 1)
@@ -343,6 +390,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Scheduler queue backlogged
onSchedulerBacklogged(manager)