aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala18
1 files changed, 15 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a3909d6ea9..2a3bd6ba0b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -487,13 +487,25 @@ private[spark] class Master(
if (state != RecoveryState.ALIVE) { return }
// First schedule drivers, they take strict precedence over applications
- val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
- for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
- for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
+ // Randomization helps balance drivers
+ val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
+ val aliveWorkerNum = shuffledAliveWorkers.size
+ var curPos = 0
+ for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
+ // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
+ // start from the last worker that was assigned a driver, and continue onwards until we have
+ // explored all alive workers.
+ curPos = (curPos + 1) % aliveWorkerNum
+ val startPos = curPos
+ var launched = false
+ while (curPos != startPos && !launched) {
+ val worker = shuffledAliveWorkers(curPos)
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
+ launched = true
}
+ curPos = (curPos + 1) % aliveWorkerNum
}
}