diff options
author | WangTaoTheTonic <barneystinson@aliyun.com> | 2014-09-10 13:06:47 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-10 13:06:47 -0700 |
commit | 558962a83fb0758ab5c13ff4ea58cc96c29cbbcc (patch) | |
tree | 72075030d47d0c71217890ff7c4393ffd125a70e /core | |
parent | e4f4886d7148bf48f9e3462b83bfb1ecc7edbe31 (diff) | |
download | spark-558962a83fb0758ab5c13ff4ea58cc96c29cbbcc.tar.gz spark-558962a83fb0758ab5c13ff4ea58cc96c29cbbcc.tar.bz2 spark-558962a83fb0758ab5c13ff4ea58cc96c29cbbcc.zip |
[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers
If the waiting driver array is too big, the drivers in it will be dispatched to the first worker we get(if it has enough resources), with or without the Randomization.
We should do randomization every time we dispatch a driver, in order to better balance drivers.
Author: WangTaoTheTonic <barneystinson@aliyun.com>
Author: WangTao <barneystinson@aliyun.com>
Closes #1106 from WangTaoTheTonic/fixBalanceDrivers and squashes the following commits:
d1a928b [WangTaoTheTonic] Minor adjustment
b6560cf [WangTaoTheTonic] solve the shuffle problem for HashSet
f674e59 [WangTaoTheTonic] add comment and minor fix
2835929 [WangTao] solve the failed test and avoid filtering
2ca3091 [WangTao] fix checkstyle
bc91bb1 [WangTao] Avoid shuffle every time we schedule the driver using round robin
bbc7087 [WangTaoTheTonic] Optimize the schedule in Master
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a3909d6ea9..2a3bd6ba0b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -487,13 +487,25 @@ private[spark] class Master( if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers + // Randomization helps balance drivers + val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) + val aliveWorkerNum = shuffledAliveWorkers.size + var curPos = 0 + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + curPos = (curPos + 1) % aliveWorkerNum + val startPos = curPos + var launched = false + while (curPos != startPos && !launched) { + val worker = shuffledAliveWorkers(curPos) if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver + launched = true } + curPos = (curPos + 1) % aliveWorkerNum } } |