aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWangTaoTheTonic <barneystinson@aliyun.com>2014-09-10 13:06:47 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-10 13:06:47 -0700
commit558962a83fb0758ab5c13ff4ea58cc96c29cbbcc (patch)
tree72075030d47d0c71217890ff7c4393ffd125a70e
parente4f4886d7148bf48f9e3462b83bfb1ecc7edbe31 (diff)
downloadspark-558962a83fb0758ab5c13ff4ea58cc96c29cbbcc.tar.gz
spark-558962a83fb0758ab5c13ff4ea58cc96c29cbbcc.tar.bz2
spark-558962a83fb0758ab5c13ff4ea58cc96c29cbbcc.zip
[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers
If the waiting driver array is too big, the drivers in it will be dispatched to the first worker we get(if it has enough resources), with or without the Randomization. We should do randomization every time we dispatch a driver, in order to better balance drivers. Author: WangTaoTheTonic <barneystinson@aliyun.com> Author: WangTao <barneystinson@aliyun.com> Closes #1106 from WangTaoTheTonic/fixBalanceDrivers and squashes the following commits: d1a928b [WangTaoTheTonic] Minor adjustment b6560cf [WangTaoTheTonic] solve the shuffle problem for HashSet f674e59 [WangTaoTheTonic] add comment and minor fix 2835929 [WangTao] solve the failed test and avoid filtering 2ca3091 [WangTao] fix checkstyle bc91bb1 [WangTao] Avoid shuffle every time we schedule the driver using round robin bbc7087 [WangTaoTheTonic] Optimize the schedule in Master
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala18
1 files changed, 15 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a3909d6ea9..2a3bd6ba0b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -487,13 +487,25 @@ private[spark] class Master(
if (state != RecoveryState.ALIVE) { return }
// First schedule drivers, they take strict precedence over applications
- val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
- for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
- for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
+ // Randomization helps balance drivers
+ val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
+ val aliveWorkerNum = shuffledAliveWorkers.size
+ var curPos = 0
+ for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
+ // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
+ // start from the last worker that was assigned a driver, and continue onwards until we have
+ // explored all alive workers.
+ curPos = (curPos + 1) % aliveWorkerNum
+ val startPos = curPos
+ var launched = false
+ while (curPos != startPos && !launched) {
+ val worker = shuffledAliveWorkers(curPos)
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
+ launched = true
}
+ curPos = (curPos + 1) % aliveWorkerNum
}
}