aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNishkam Ravi <nravi@cloudera.com>2015-07-25 22:56:25 -0700
committerAndrew Or <andrew@databricks.com>2015-07-25 22:56:25 -0700
commit41a7cdf85de2d583d8b8759941a9d6c6e98cae4d (patch)
treec127eb02b81eb3f3f19b31013c65b3816494f5f9 /core
parentb1f4b4abfd8d038c3684685b245b5fd31b927da0 (diff)
downloadspark-41a7cdf85de2d583d8b8759941a9d6c6e98cae4d.tar.gz
spark-41a7cdf85de2d583d8b8759941a9d6c6e98cae4d.tar.bz2
spark-41a7cdf85de2d583d8b8759941a9d6c6e98cae4d.zip
[SPARK-8881] [SPARK-9260] Fix algorithm for scheduling executors on workers
Current scheduling algorithm allocates one core at a time and in doing so ends up ignoring spark.executor.cores. As a result, when spark.cores.max/spark.executor.cores (i.e, num_executors) < num_workers, executors are not launched and the app hangs. This PR fixes and refactors the scheduling algorithm. andrewor14 Author: Nishkam Ravi <nravi@cloudera.com> Author: nishkamravi2 <nishkamravi@gmail.com> Closes #7274 from nishkamravi2/master_scheduler and squashes the following commits: b998097 [nishkamravi2] Update Master.scala da0f491 [Nishkam Ravi] Update Master.scala 79084e8 [Nishkam Ravi] Update Master.scala 1daf25f [Nishkam Ravi] Update Master.scala f279cdf [Nishkam Ravi] Update Master.scala adec84b [Nishkam Ravi] Update Master.scala a06da76 [nishkamravi2] Update Master.scala 40c8f9f [nishkamravi2] Update Master.scala (to trigger retest) c11c689 [nishkamravi2] Update EventLoggingListenerSuite.scala 5d6a19c [nishkamravi2] Update Master.scala (for the purpose of issuing a retest) 2d6371c [Nishkam Ravi] Update Master.scala 66362d5 [nishkamravi2] Update Master.scala ee7cf0e [Nishkam Ravi] Improved scheduling algorithm for executors
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala112
1 files changed, 75 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4615febf17..029f94d102 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -541,6 +541,7 @@ private[master] class Master(
/**
* Schedule executors to be launched on the workers.
+ * Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
@@ -551,39 +552,73 @@ private[master] class Master(
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
+ *
+ * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
+ * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
+ * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
+ * allocated at a time, 12 cores from each worker would be assigned to each executor.
+ * Since 12 < 16, no executors would launch [SPARK-8881].
*/
- private def startExecutorsOnWorkers(): Unit = {
- // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
- // in the queue, then the second app, etc.
- if (spreadOutApps) {
- // Try to spread out each app among all the workers, until it has all its cores
- for (app <- waitingApps if app.coresLeft > 0) {
- val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
- worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
- .sortBy(_.coresFree).reverse
- val numUsable = usableWorkers.length
- val assigned = new Array[Int](numUsable) // Number of cores to give on each node
- var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
- var pos = 0
- while (toAssign > 0) {
- if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
- toAssign -= 1
- assigned(pos) += 1
+ private[master] def scheduleExecutorsOnWorkers(
+ app: ApplicationInfo,
+ usableWorkers: Array[WorkerInfo],
+ spreadOutApps: Boolean): Array[Int] = {
+ // If the number of cores per executor is not specified, then we can just schedule
+ // 1 core at a time since we expect a single executor to be launched on each worker
+ val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+ val memoryPerExecutor = app.desc.memoryPerExecutorMB
+ val numUsable = usableWorkers.length
+ val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
+ val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
+ var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
+ var freeWorkers = (0 until numUsable).toIndexedSeq
+
+ def canLaunchExecutor(pos: Int): Boolean = {
+ usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
+ usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
+ }
+
+ while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
+ freeWorkers = freeWorkers.filter(canLaunchExecutor)
+ freeWorkers.foreach { pos =>
+ var keepScheduling = true
+ while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
+ coresToAssign -= coresPerExecutor
+ assignedCores(pos) += coresPerExecutor
+ assignedMemory(pos) += memoryPerExecutor
+
+ // Spreading out an application means spreading out its executors across as
+ // many workers as possible. If we are not spreading out, then we should keep
+ // scheduling executors on this worker until we use all of its resources.
+ // Otherwise, just move on to the next worker.
+ if (spreadOutApps) {
+ keepScheduling = false
}
- pos = (pos + 1) % numUsable
- }
- // Now that we've decided how many cores to give on each node, let's actually give them
- for (pos <- 0 until numUsable if assigned(pos) > 0) {
- allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
}
}
- } else {
- // Pack each app into as few workers as possible until we've assigned all its cores
- for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
- for (app <- waitingApps if app.coresLeft > 0) {
- allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
- }
+ }
+ assignedCores
+ }
+
+ /**
+ * Schedule and launch executors on workers
+ */
+ private def startExecutorsOnWorkers(): Unit = {
+ // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
+ // in the queue, then the second app, etc.
+ for (app <- waitingApps if app.coresLeft > 0) {
+ val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
+ // Filter out workers that don't have enough resources to launch an executor
+ val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
+ .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
+ worker.coresFree >= coresPerExecutor.getOrElse(1))
+ .sortBy(_.coresFree).reverse
+ val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
+
+ // Now that we've decided how many cores to allocate on each worker, let's allocate them
+ for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
+ allocateWorkerResourceToExecutors(
+ app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
@@ -591,19 +626,22 @@ private[master] class Master(
/**
* Allocate a worker's resources to one or more executors.
* @param app the info of the application which the executors belong to
- * @param coresToAllocate cores on this worker to be allocated to this application
+ * @param assignedCores number of cores on this worker for this application
+ * @param coresPerExecutor number of cores per executor
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
- coresToAllocate: Int,
+ assignedCores: Int,
+ coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
- val memoryPerExecutor = app.desc.memoryPerExecutorMB
- val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
- var coresLeft = coresToAllocate
- while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
- val exec = app.addExecutor(worker, coresPerExecutor)
- coresLeft -= coresPerExecutor
+ // If the number of cores per executor is specified, we divide the cores assigned
+ // to this worker evenly among the executors with no remainder.
+ // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
+ val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
+ val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
+ for (i <- 1 to numExecutors) {
+ val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}