diff options
author | CodingCat <zhunansjtu@gmail.com> | 2016-03-15 10:10:23 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-15 10:10:23 +0000 |
commit | bd5365bbe9ff6518cde9402ee8843ec1002fff5b (patch) | |
tree | 9c72d2c8360dee506d8e1db5250cc905e3e6fb9d /core | |
parent | dafd70fbfe70702502ef198f2a8f529ef7557592 (diff) | |
download | spark-bd5365bbe9ff6518cde9402ee8843ec1002fff5b.tar.gz spark-bd5365bbe9ff6518cde9402ee8843ec1002fff5b.tar.bz2 spark-bd5365bbe9ff6518cde9402ee8843ec1002fff5b.zip |
[SPARK-13803] restore the changes in SPARK-3411
## What changes were proposed in this pull request?
This patch contains the functionality to balance the load of the cluster-mode drivers among workers
This patch restores the changes in https://github.com/apache/spark/pull/1106 which was erased due to the merging of https://github.com/apache/spark/pull/731
## How was this patch tested?
test with existing test cases
Author: CodingCat <zhunansjtu@gmail.com>
Closes #11702 from CodingCat/SPARK-13803.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 21 |
1 files changed, 17 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 6b9b1408ee..c97ad4d723 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -727,15 +727,28 @@ private[deploy] class Master( * every time a new app joins or resource availability changes. */ private def schedule(): Unit = { - if (state != RecoveryState.ALIVE) { return } + if (state != RecoveryState.ALIVE) { + return + } // Drivers take strict precedence over executors - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- waitingDrivers) { + val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) + val numWorkersAlive = shuffledAliveWorkers.size + var curPos = 0 + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + var launched = false + var numWorkersVisited = 0 + while (numWorkersVisited < numWorkersAlive && !launched) { + val worker = shuffledAliveWorkers(curPos) + numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver + launched = true } + curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() |