From c23d640516e05a32f1380cdd3d35bf948c92cd60 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 26 Dec 2013 12:37:01 -0800 Subject: Addressing smaller changes from Aaron's review --- .../org/apache/spark/deploy/DeployMessage.scala | 5 ++-- .../deploy/client/DriverClientArguments.scala | 2 +- .../apache/spark/deploy/master/DriverInfo.scala | 1 - .../org/apache/spark/deploy/master/Master.scala | 30 ++++++++++++++-------- .../apache/spark/deploy/master/WorkerInfo.scala | 4 +-- .../apache/spark/deploy/master/ui/IndexPage.scala | 4 +-- .../org/apache/spark/deploy/worker/Worker.scala | 12 ++++----- 7 files changed, 31 insertions(+), 27 deletions(-) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 82bb33a2ec..7bfc3779ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -83,8 +83,7 @@ private[deploy] object DeployMessages { sparkHome: String) extends DeployMessage - case class LaunchDriver(driverId: String, driverDesc: DriverDescription) - extends DeployMessage + case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage case class KillDriver(driverId: String) extends DeployMessage @@ -134,8 +133,8 @@ private[deploy] object DeployMessages { // Master to MasterWebUI case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo], - activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], + activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], status: MasterState) { Utils.checkHost(host, "Required hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala index 60e6549188..28bc54962e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -89,7 +89,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { */ def printUsageAndExit(exitCode: Int) { System.err.println( - "usage: DriverClient launch [options] " + + "usage: DriverClient [options] launch " + "[driver options]\n" + "usage: DriverClient kill \n\n" + "Options:\n" + diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 052c474d2c..33377931d6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -33,5 +33,4 @@ private[spark] class DriverInfo( @transient var exception: Option[Exception] = None /* Most recent worker assigned to this driver */ @transient var worker: Option[WorkerInfo] = None - } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0528ef43a1..7f9ad8a7ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -19,11 +19,11 @@ package org.apache.spark.deploy.master import java.text.SimpleDateFormat import java.util.Date -import java.util.concurrent.TimeUnit import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.Random import akka.actor._ import akka.pattern.ask @@ -174,8 +174,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestSubmitDriver(description) => { - if (state == RecoveryState.STANDBY) { - sender ! SubmitDriverResponse(false, "Standby master cannot accept driver submission") + if (state != RecoveryState.ALIVE) { + val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." + sender ! SubmitDriverResponse(false, msg) } else { logInfo("Driver submitted " + description.mainClass) val driver = createDriver(description) @@ -192,14 +193,18 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestKillDriver(driverId) => { - if (state == RecoveryState.STANDBY) { - sender ! KillDriverResponse(false, "Standby master cannot kill drivers") + if (state != RecoveryState.ALIVE) { + val msg = s"Can only kill drivers in ALIVE state. Current state: $state." + sender ! KillDriverResponse(false, msg) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) driver match { case Some(d) => - if (waitingDrivers.contains(d)) { waitingDrivers -= d } + if (waitingDrivers.contains(d)) { + waitingDrivers -= d + self ! DriverStateChanged(driverId, DriverState.KILLED, None) + } else { // We just notify the worker to kill the driver here. The final bookkeeping occurs // on the return path when the worker submits a state change back to the master @@ -208,6 +213,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act w.actor ! KillDriver(driverId) } } + // TODO: It would be nice for this to be a synchronous response val msg = s"Kill request for $driverId submitted" logInfo(msg) sender ! KillDriverResponse(true, msg) @@ -338,8 +344,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestMasterState => { - sender ! MasterStateResponse(host, port, workers.toArray, drivers.toArray, - completedDrivers.toArray ,apps.toArray, completedApps.toArray, state) + sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, + drivers.toArray, completedDrivers.toArray, state) } case CheckForWorkerTimeOut => { @@ -423,10 +429,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act */ def schedule() { if (state != RecoveryState.ALIVE) { return } + // First schedule drivers, they take strict precedence over applications - for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (driver <- Seq(waitingDrivers: _*)) { - if (worker.memoryFree > driver.desc.mem && worker.coresFree > driver.desc.cores) { + val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers + for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { + for (driver <- waitingDrivers) { + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 28cd46359c..c5fa9cf7d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -37,8 +37,8 @@ private[spark] class WorkerInfo( Utils.checkHost(host, "Expected hostname") assert (port > 0) - @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info - @transient var drivers: mutable.HashMap[String, DriverInfo] = _ + @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info + @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info @transient var state: WorkerState.Value = _ @transient var coresUsed: Int = _ @transient var memoryUsed: Int = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 3c6fca3780..951fc679ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -94,7 +94,6 @@ private[spark] class IndexPage(parent: MasterWebUI) {

Running Applications

- {activeAppsTable}
@@ -109,7 +108,6 @@ private[spark] class IndexPage(parent: MasterWebUI) {

Active Drivers

- {activeDriversTable}
@@ -167,7 +165,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { {driver.worker.map(w => w.id.toString).getOrElse("None")} {driver.state} - {driver.desc.cores.toString} + {driver.desc.cores} {Utils.megabytesToString(driver.desc.mem.toLong)} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 42c28cf22d..21ec881f46 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -222,8 +222,8 @@ private[spark] class Worker( logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) - finishedExecutors(fullId) = executor executors -= fullId + finishedExecutors(fullId) = executor coresUsed -= executor.cores memoryUsed -= executor.memory } @@ -248,8 +248,8 @@ private[spark] class Worker( drivers(driverId) = driver driver.start() - coresUsed += 1 - memoryUsed += memory + coresUsed += driverDesc.cores + memoryUsed += driverDesc.mem } case KillDriver(driverId) => { @@ -269,16 +269,16 @@ private[spark] class Worker( case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => - logInfo(s"Driver $driverId was killed") + logInfo(s"Driver $driverId was killed by user") } masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception) } val driver = drivers(driverId) - memoryUsed -= driver.driverDesc.mem - coresUsed -= driver.driverDesc.cores drivers -= driverId finishedDrivers(driverId) = driver + memoryUsed -= driver.driverDesc.mem + coresUsed -= driver.driverDesc.cores } case x: DisassociatedEvent if x.remoteAddress == masterAddress => -- cgit v1.2.3