aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-26 12:37:01 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-26 12:38:39 -0800
commitc23d640516e05a32f1380cdd3d35bf948c92cd60 (patch)
tree3377f864ae2bf34462c6bec6534bd9b410871bc8 /core/src/main/scala/org/apache
parentda20270b839cc10d4459848f5b485ca566cd2dfb (diff)
downloadspark-c23d640516e05a32f1380cdd3d35bf948c92cd60.tar.gz
spark-c23d640516e05a32f1380cdd3d35bf948c92cd60.tar.bz2
spark-c23d640516e05a32f1380cdd3d35bf948c92cd60.zip
Addressing smaller changes from Aaron's review
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala12
7 files changed, 31 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 82bb33a2ec..7bfc3779ab 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -83,8 +83,7 @@ private[deploy] object DeployMessages {
sparkHome: String)
extends DeployMessage
- case class LaunchDriver(driverId: String, driverDesc: DriverDescription)
- extends DeployMessage
+ case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage
case class KillDriver(driverId: String) extends DeployMessage
@@ -134,8 +133,8 @@ private[deploy] object DeployMessages {
// Master to MasterWebUI
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
- activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
+ activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
status: MasterState) {
Utils.checkHost(host, "Required hostname")
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
index 60e6549188..28bc54962e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
@@ -89,7 +89,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
*/
def printUsageAndExit(exitCode: Int) {
System.err.println(
- "usage: DriverClient launch [options] <active-master> <jar-url> <main-class> " +
+ "usage: DriverClient [options] launch <active-master> <jar-url> <main-class> " +
"[driver options]\n" +
"usage: DriverClient kill <active-master> <driver-id>\n\n" +
"Options:\n" +
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
index 052c474d2c..33377931d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -33,5 +33,4 @@ private[spark] class DriverInfo(
@transient var exception: Option[Exception] = None
/* Most recent worker assigned to this driver */
@transient var worker: Option[WorkerInfo] = None
-
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0528ef43a1..7f9ad8a7ef 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -19,11 +19,11 @@ package org.apache.spark.deploy.master
import java.text.SimpleDateFormat
import java.util.Date
-import java.util.concurrent.TimeUnit
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.util.Random
import akka.actor._
import akka.pattern.ask
@@ -174,8 +174,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestSubmitDriver(description) => {
- if (state == RecoveryState.STANDBY) {
- sender ! SubmitDriverResponse(false, "Standby master cannot accept driver submission")
+ if (state != RecoveryState.ALIVE) {
+ val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
+ sender ! SubmitDriverResponse(false, msg)
} else {
logInfo("Driver submitted " + description.mainClass)
val driver = createDriver(description)
@@ -192,14 +193,18 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestKillDriver(driverId) => {
- if (state == RecoveryState.STANDBY) {
- sender ! KillDriverResponse(false, "Standby master cannot kill drivers")
+ if (state != RecoveryState.ALIVE) {
+ val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
+ sender ! KillDriverResponse(false, msg)
} else {
logInfo("Asked to kill driver " + driverId)
val driver = drivers.find(_.id == driverId)
driver match {
case Some(d) =>
- if (waitingDrivers.contains(d)) { waitingDrivers -= d }
+ if (waitingDrivers.contains(d)) {
+ waitingDrivers -= d
+ self ! DriverStateChanged(driverId, DriverState.KILLED, None)
+ }
else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
@@ -208,6 +213,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
w.actor ! KillDriver(driverId)
}
}
+ // TODO: It would be nice for this to be a synchronous response
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
sender ! KillDriverResponse(true, msg)
@@ -338,8 +344,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestMasterState => {
- sender ! MasterStateResponse(host, port, workers.toArray, drivers.toArray,
- completedDrivers.toArray ,apps.toArray, completedApps.toArray, state)
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
+ drivers.toArray, completedDrivers.toArray, state)
}
case CheckForWorkerTimeOut => {
@@ -423,10 +429,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
*/
def schedule() {
if (state != RecoveryState.ALIVE) { return }
+
// First schedule drivers, they take strict precedence over applications
- for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
- for (driver <- Seq(waitingDrivers: _*)) {
- if (worker.memoryFree > driver.desc.mem && worker.coresFree > driver.desc.cores) {
+ val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
+ for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
+ for (driver <- waitingDrivers) {
+ if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 28cd46359c..c5fa9cf7d7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -37,8 +37,8 @@ private[spark] class WorkerInfo(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
- @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info
- @transient var drivers: mutable.HashMap[String, DriverInfo] = _
+ @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info
+ @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
@transient var memoryUsed: Int = _
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 3c6fca3780..951fc679ef 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -94,7 +94,6 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<div class="row-fluid">
<div class="span12">
<h4> Running Applications </h4>
-
{activeAppsTable}
</div>
</div>
@@ -109,7 +108,6 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<div class="row-fluid">
<div class="span12">
<h4> Active Drivers </h4>
-
{activeDriversTable}
</div>
</div>
@@ -167,7 +165,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td>
<td>{driver.state}</td>
<td sorttable_customkey={driver.desc.cores.toString}>
- {driver.desc.cores.toString}
+ {driver.desc.cores}
</td>
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 42c28cf22d..21ec881f46 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -222,8 +222,8 @@ private[spark] class Worker(
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
- finishedExecutors(fullId) = executor
executors -= fullId
+ finishedExecutors(fullId) = executor
coresUsed -= executor.cores
memoryUsed -= executor.memory
}
@@ -248,8 +248,8 @@ private[spark] class Worker(
drivers(driverId) = driver
driver.start()
- coresUsed += 1
- memoryUsed += memory
+ coresUsed += driverDesc.cores
+ memoryUsed += driverDesc.mem
}
case KillDriver(driverId) => {
@@ -269,16 +269,16 @@ private[spark] class Worker(
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
- logInfo(s"Driver $driverId was killed")
+ logInfo(s"Driver $driverId was killed by user")
}
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
}
val driver = drivers(driverId)
- memoryUsed -= driver.driverDesc.mem
- coresUsed -= driver.driverDesc.cores
drivers -= driverId
finishedDrivers(driverId) = driver
+ memoryUsed -= driver.driverDesc.mem
+ coresUsed -= driver.driverDesc.cores
}
case x: DisassociatedEvent if x.remoteAddress == masterAddress =>