diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-12-26 12:37:01 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-12-26 12:38:39 -0800 |
commit | c23d640516e05a32f1380cdd3d35bf948c92cd60 (patch) | |
tree | 3377f864ae2bf34462c6bec6534bd9b410871bc8 | |
parent | da20270b839cc10d4459848f5b485ca566cd2dfb (diff) | |
download | spark-c23d640516e05a32f1380cdd3d35bf948c92cd60.tar.gz spark-c23d640516e05a32f1380cdd3d35bf948c92cd60.tar.bz2 spark-c23d640516e05a32f1380cdd3d35bf948c92cd60.zip |
Addressing smaller changes from Aaron's review
7 files changed, 31 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 82bb33a2ec..7bfc3779ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -83,8 +83,7 @@ private[deploy] object DeployMessages { sparkHome: String) extends DeployMessage - case class LaunchDriver(driverId: String, driverDesc: DriverDescription) - extends DeployMessage + case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage case class KillDriver(driverId: String) extends DeployMessage @@ -134,8 +133,8 @@ private[deploy] object DeployMessages { // Master to MasterWebUI case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo], - activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], + activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], status: MasterState) { Utils.checkHost(host, "Required hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala index 60e6549188..28bc54962e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -89,7 +89,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { */ def printUsageAndExit(exitCode: Int) { System.err.println( - "usage: DriverClient launch [options] <active-master> <jar-url> <main-class> " + + "usage: DriverClient [options] launch <active-master> <jar-url> <main-class> " + "[driver options]\n" + "usage: DriverClient kill <active-master> <driver-id>\n\n" + "Options:\n" + diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 052c474d2c..33377931d6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -33,5 +33,4 @@ private[spark] class DriverInfo( @transient var exception: Option[Exception] = None /* Most recent worker assigned to this driver */ @transient var worker: Option[WorkerInfo] = None - } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0528ef43a1..7f9ad8a7ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -19,11 +19,11 @@ package org.apache.spark.deploy.master import java.text.SimpleDateFormat import java.util.Date -import java.util.concurrent.TimeUnit import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.Random import akka.actor._ import akka.pattern.ask @@ -174,8 +174,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestSubmitDriver(description) => { - if (state == RecoveryState.STANDBY) { - sender ! SubmitDriverResponse(false, "Standby master cannot accept driver submission") + if (state != RecoveryState.ALIVE) { + val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." + sender ! SubmitDriverResponse(false, msg) } else { logInfo("Driver submitted " + description.mainClass) val driver = createDriver(description) @@ -192,14 +193,18 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestKillDriver(driverId) => { - if (state == RecoveryState.STANDBY) { - sender ! KillDriverResponse(false, "Standby master cannot kill drivers") + if (state != RecoveryState.ALIVE) { + val msg = s"Can only kill drivers in ALIVE state. Current state: $state." + sender ! KillDriverResponse(false, msg) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) driver match { case Some(d) => - if (waitingDrivers.contains(d)) { waitingDrivers -= d } + if (waitingDrivers.contains(d)) { + waitingDrivers -= d + self ! DriverStateChanged(driverId, DriverState.KILLED, None) + } else { // We just notify the worker to kill the driver here. The final bookkeeping occurs // on the return path when the worker submits a state change back to the master @@ -208,6 +213,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act w.actor ! KillDriver(driverId) } } + // TODO: It would be nice for this to be a synchronous response val msg = s"Kill request for $driverId submitted" logInfo(msg) sender ! KillDriverResponse(true, msg) @@ -338,8 +344,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestMasterState => { - sender ! MasterStateResponse(host, port, workers.toArray, drivers.toArray, - completedDrivers.toArray ,apps.toArray, completedApps.toArray, state) + sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, + drivers.toArray, completedDrivers.toArray, state) } case CheckForWorkerTimeOut => { @@ -423,10 +429,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act */ def schedule() { if (state != RecoveryState.ALIVE) { return } + // First schedule drivers, they take strict precedence over applications - for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (driver <- Seq(waitingDrivers: _*)) { - if (worker.memoryFree > driver.desc.mem && worker.coresFree > driver.desc.cores) { + val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers + for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { + for (driver <- waitingDrivers) { + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 28cd46359c..c5fa9cf7d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -37,8 +37,8 @@ private[spark] class WorkerInfo( Utils.checkHost(host, "Expected hostname") assert (port > 0) - @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info - @transient var drivers: mutable.HashMap[String, DriverInfo] = _ + @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info + @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info @transient var state: WorkerState.Value = _ @transient var coresUsed: Int = _ @transient var memoryUsed: Int = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 3c6fca3780..951fc679ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -94,7 +94,6 @@ private[spark] class IndexPage(parent: MasterWebUI) { <div class="row-fluid"> <div class="span12"> <h4> Running Applications </h4> - {activeAppsTable} </div> </div> @@ -109,7 +108,6 @@ private[spark] class IndexPage(parent: MasterWebUI) { <div class="row-fluid"> <div class="span12"> <h4> Active Drivers </h4> - {activeDriversTable} </div> </div> @@ -167,7 +165,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { <td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td> <td>{driver.state}</td> <td sorttable_customkey={driver.desc.cores.toString}> - {driver.desc.cores.toString} + {driver.desc.cores} </td> <td sorttable_customkey={driver.desc.mem.toString}> {Utils.megabytesToString(driver.desc.mem.toLong)} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 42c28cf22d..21ec881f46 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -222,8 +222,8 @@ private[spark] class Worker( logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) - finishedExecutors(fullId) = executor executors -= fullId + finishedExecutors(fullId) = executor coresUsed -= executor.cores memoryUsed -= executor.memory } @@ -248,8 +248,8 @@ private[spark] class Worker( drivers(driverId) = driver driver.start() - coresUsed += 1 - memoryUsed += memory + coresUsed += driverDesc.cores + memoryUsed += driverDesc.mem } case KillDriver(driverId) => { @@ -269,16 +269,16 @@ private[spark] class Worker( case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => - logInfo(s"Driver $driverId was killed") + logInfo(s"Driver $driverId was killed by user") } masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception) } val driver = drivers(driverId) - memoryUsed -= driver.driverDesc.mem - coresUsed -= driver.driverDesc.cores drivers -= driverId finishedDrivers(driverId) = driver + memoryUsed -= driver.driverDesc.mem + coresUsed -= driver.driverDesc.cores } case x: DisassociatedEvent if x.remoteAddress == masterAddress => |