aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-11-08 23:13:12 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-11-08 23:13:12 -0800
commit6607f546ccadf307b0a862f1b52ab0b12316420d (patch)
tree02c592bb76dcb27c68f940b33662c0795e1cc56d
parent66cbdee941ee12eac5eea38709d542938bba575a (diff)
downloadspark-6607f546ccadf307b0a862f1b52ab0b12316420d.tar.gz
spark-6607f546ccadf307b0a862f1b52ab0b12316420d.tar.bz2
spark-6607f546ccadf307b0a862f1b52ab0b12316420d.zip
Added an option to spread out jobs in the standalone mode.
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala63
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala4
-rw-r--r--core/src/main/twirl/spark/deploy/master/job_row.scala.html7
3 files changed, 56 insertions, 18 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 5ef7411f4d..7e5cd6b171 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
+ // As a temporary workaround before better ways of configuring memory, we allow users to set
+ // a flag that will perform round-robin scheduling across the nodes (spreading out each job
+ // among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
+ val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean
+
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -128,23 +133,57 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
/**
+ * Can a job use the given worker? True if the worker has enough memory and we haven't already
+ * launched an executor for the job on it (right now the standalone backend doesn't like having
+ * two executors on the same worker).
+ */
+ def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
+ worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
+ }
+
+ /**
* Schedule the currently available resources among waiting jobs. This method will be called
* every time a new job joins or resource availability changes.
*/
def schedule() {
- // Right now this is a very simple FIFO scheduler. We keep looking through the jobs
- // in order of submission time and launching the first one that fits on each node.
- for (worker <- workers if worker.coresFree > 0) {
- for (job <- waitingJobs.clone()) {
- val jobMemory = job.desc.memoryPerSlave
- if (worker.memoryFree >= jobMemory) {
- val coresToUse = math.min(worker.coresFree, job.coresLeft)
- val exec = job.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec)
+ // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
+ // in the queue, then the second job, etc.
+ if (spreadOutJobs) {
+ // Try to spread out each job among all the nodes, until it has all its cores
+ for (job <- waitingJobs if job.coresLeft > 0) {
+ val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
+ val numUsable = usableWorkers.length
+ val assigned = new Array[Int](numUsable) // Number of cores to give on each node
+ var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
+ var pos = 0
+ while (toAssign > 0) {
+ if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
+ toAssign -= 1
+ assigned(pos) += 1
+ }
+ pos = (pos + 1) % numUsable
}
- if (job.coresLeft == 0) {
- waitingJobs -= job
- job.state = JobState.RUNNING
+ // Now that we've decided how many cores to give on each node, let's actually give them
+ for (pos <- 0 until numUsable) {
+ if (assigned(pos) > 0) {
+ val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
+ launchExecutor(usableWorkers(pos), exec)
+ job.state = JobState.RUNNING
+ }
+ }
+ }
+ } else {
+ // Pack each job into as few nodes as possible until we've assigned all its cores
+ for (worker <- workers if worker.coresFree > 0) {
+ for (job <- waitingJobs if job.coresLeft > 0) {
+ if (canUse(job, worker)) {
+ val coresToUse = math.min(worker.coresFree, job.coresLeft)
+ if (coresToUse > 0) {
+ val exec = job.addExecutor(worker, coresToUse)
+ launchExecutor(worker, exec)
+ job.state = JobState.RUNNING
+ }
+ }
}
}
}
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 16b3f9b653..706b1453aa 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -33,6 +33,10 @@ private[spark] class WorkerInfo(
memoryUsed -= exec.memory
}
}
+
+ def hasExecutor(job: JobInfo): Boolean = {
+ executors.values.exists(_.job == job)
+ }
def webUiAddress : String = {
"http://" + this.host + ":" + this.webUiPort
diff --git a/core/src/main/twirl/spark/deploy/master/job_row.scala.html b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
index fff7953e7d..7c466a6a2c 100644
--- a/core/src/main/twirl/spark/deploy/master/job_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
@@ -10,12 +10,7 @@
</td>
<td>@job.desc.name</td>
<td>
- @job.coresGranted Granted
- @if(job.desc.cores == Integer.MAX_VALUE) {
-
- } else {
- , @job.coresLeft
- }
+ @job.coresGranted
</td>
<td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
<td>@formatDate(job.submitDate)</td>