aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2016-03-15 10:10:23 +0000
committerSean Owen <sowen@cloudera.com>2016-03-15 10:10:23 +0000
commitbd5365bbe9ff6518cde9402ee8843ec1002fff5b (patch)
tree9c72d2c8360dee506d8e1db5250cc905e3e6fb9d
parentdafd70fbfe70702502ef198f2a8f529ef7557592 (diff)
downloadspark-bd5365bbe9ff6518cde9402ee8843ec1002fff5b.tar.gz
spark-bd5365bbe9ff6518cde9402ee8843ec1002fff5b.tar.bz2
spark-bd5365bbe9ff6518cde9402ee8843ec1002fff5b.zip
[SPARK-13803] restore the changes in SPARK-3411
## What changes were proposed in this pull request? This patch contains the functionality to balance the load of the cluster-mode drivers among workers This patch restores the changes in https://github.com/apache/spark/pull/1106 which was erased due to the merging of https://github.com/apache/spark/pull/731 ## How was this patch tested? test with existing test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #11702 from CodingCat/SPARK-13803.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala21
1 files changed, 17 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 6b9b1408ee..c97ad4d723 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -727,15 +727,28 @@ private[deploy] class Master(
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
- if (state != RecoveryState.ALIVE) { return }
+ if (state != RecoveryState.ALIVE) {
+ return
+ }
// Drivers take strict precedence over executors
- val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
- for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
- for (driver <- waitingDrivers) {
+ val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
+ val numWorkersAlive = shuffledAliveWorkers.size
+ var curPos = 0
+ for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
+ // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
+ // start from the last worker that was assigned a driver, and continue onwards until we have
+ // explored all alive workers.
+ var launched = false
+ var numWorkersVisited = 0
+ while (numWorkersVisited < numWorkersAlive && !launched) {
+ val worker = shuffledAliveWorkers(curPos)
+ numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
+ launched = true
}
+ curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()