aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-21 21:08:13 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-25 01:19:01 -0800
commit6a4acc4c2d5c510cc76049dd8727cec76a2173e8 (patch)
tree17e6582913df202ce24928bdd5ea1b4bf09c4315 /core/src/main/scala/org/apache
parent1070b566d4cde4e9a69ccd318747b218f5a44dc7 (diff)
downloadspark-6a4acc4c2d5c510cc76049dd8727cec76a2173e8.tar.gz
spark-6a4acc4c2d5c510cc76049dd8727cec76a2173e8.tar.bz2
spark-6a4acc4c2d5c510cc76049dd8727cec76a2173e8.zip
Initial cut at driver submission.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala129
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala152
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala178
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala60
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala39
16 files changed, 781 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 275331724a..67435261e4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -20,10 +20,11 @@ package org.apache.spark.deploy
import scala.collection.immutable.List
import org.apache.spark.deploy.ExecutorState.ExecutorState
-import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
+import org.apache.spark.deploy.master.{DriverInfo, WorkerInfo, ApplicationInfo}
import org.apache.spark.deploy.master.RecoveryState.MasterState
-import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.util.Utils
+import org.apache.spark.deploy.master.DriverState.DriverState
private[deploy] sealed trait DeployMessage extends Serializable
@@ -54,7 +55,14 @@ private[deploy] object DeployMessages {
exitStatus: Option[Int])
extends DeployMessage
- case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])
+ case class DriverStateChanged(
+ driverId: String,
+ state: DriverState,
+ exception: Option[Exception])
+ extends DeployMessage
+
+ case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
+ driverIds: Seq[String])
case class Heartbeat(workerId: String) extends DeployMessage
@@ -76,14 +84,19 @@ private[deploy] object DeployMessages {
sparkHome: String)
extends DeployMessage
- // Client to Master
+ case class LaunchDriver(driverId: String, jarUrl: String, mainClass: String, mem: Int)
+ extends DeployMessage
+
+ case class KillDriver(driverId: String) extends DeployMessage
+
+ // AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage
case class MasterChangeAcknowledged(appId: String)
- // Master to Client
+ // Master to AppClient
case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
@@ -97,11 +110,21 @@ private[deploy] object DeployMessages {
case class ApplicationRemoved(message: String)
- // Internal message in Client
+ // DriverClient <-> Master
+
+ case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
+
+ case class SubmitDriverResponse(success: Boolean, message: String) extends DeployMessage
+
+ case class RequestKillDriver(driverId: String) extends DeployMessage
+
+ case class KillDriverResponse(success: Boolean, message: String) extends DeployMessage
+
+ // Internal message in AppClient
- case object StopClient
+ case object StopAppClient
- // Master to Worker & Client
+ // Master to Worker & AppClient
case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
@@ -112,6 +135,7 @@ private[deploy] object DeployMessages {
// Master to MasterWebUI
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
+ activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
status: MasterState) {
@@ -128,7 +152,8 @@ private[deploy] object DeployMessages {
// Worker to WorkerWebUI
case class WorkerStateResponse(host: String, port: Int, workerId: String,
- executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
+ executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner],
+ drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String,
cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
Utils.checkHost(host, "Required hostname")
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
new file mode 100644
index 0000000000..52f6b1b554
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+private[spark] class DriverDescription(
+ val jarUrl: String,
+ val mainClass: String,
+ val mem: Integer) // TODO: Should this be Long?
+ extends Serializable {
+
+ override def toString: String = s"DriverDescription ($mainClass)"
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index c5a0d1fd01..737ba09117 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -147,7 +147,7 @@ private[spark] class AppClient(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
- case StopClient =>
+ case StopAppClient =>
markDead()
sender ! true
context.stop(self)
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
new file mode 100644
index 0000000000..482bafd0e0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import akka.actor._
+import akka.remote.{RemotingLifecycleEvent}
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.deploy.{DeployMessage, DriverDescription}
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.{MasterArguments, Master}
+import akka.pattern.ask
+
+import org.apache.spark.util.{Utils, AkkaUtils}
+import scala.concurrent.duration.{FiniteDuration, Duration}
+import java.util.concurrent.TimeUnit
+import akka.util.Timeout
+import scala.concurrent.Await
+import akka.actor.Actor.emptyBehavior
+
+/**
+ * Parent class for actors that to send a single message to the standalone master and then die.
+ */
+private[spark] abstract class SingleMessageClient(
+ actorSystem: ActorSystem, master: String, message: DeployMessage)
+ extends Logging {
+
+ // Concrete child classes must implement
+ def handleResponse(response: Any)
+
+ var actor: ActorRef = actorSystem.actorOf(Props(new DriverActor()))
+
+ class DriverActor extends Actor with Logging {
+ override def preStart() {
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ logInfo("Sending message to master " + master + "...")
+ val masterActor = context.actorSelection(Master.toAkkaUrl(master))
+ val timeoutDuration: FiniteDuration = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
+ val submitFuture = masterActor.ask(message)(timeoutDuration)
+ handleResponse(Await.result(submitFuture, timeoutDuration))
+ actorSystem.stop(actor)
+ actorSystem.shutdown()
+ }
+
+ override def receive = emptyBehavior
+ }
+}
+
+/**
+ * Submits a driver to the master.
+ */
+private[spark] class SubmissionClient(actorSystem: ActorSystem, master: String,
+ driverDescription: DriverDescription)
+ extends SingleMessageClient(actorSystem, master, RequestSubmitDriver(driverDescription)) {
+
+ override def handleResponse(response: Any) {
+ val resp = response.asInstanceOf[SubmitDriverResponse]
+ if (!resp.success) {
+ logError(s"Error submitting driver to $master")
+ logError(resp.message)
+ }
+ }
+}
+
+/**
+ * Terminates a client at the master.
+ */
+private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, driverId: String)
+ extends SingleMessageClient(actorSystem, master, RequestKillDriver(driverId)) {
+
+ override def handleResponse(response: Any) {
+ val resp = response.asInstanceOf[KillDriverResponse]
+ if (!resp.success) {
+ logError(s"Error terminating $driverId at $master")
+ logError(resp.message)
+ }
+ }
+}
+
+/**
+ * Callable utility for starting and terminating drivers inside of the standalone scheduler.
+ */
+object DriverClient {
+
+ def main(args: Array[String]) {
+ if (args.size < 3) {
+ println("usage: DriverClient launch <active-master> <jar-url> <main-class>")
+ println("usage: DriverClient kill <active-master> <driver-id>")
+ System.exit(-1)
+ }
+
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ "driverSubmission", Utils.localHostName(), 0)
+
+ // TODO Should be configurable
+ val mem = 512
+
+ args(0) match {
+ case "launch" =>
+ val master = args(1)
+ val jarUrl = args(2)
+ val mainClass = args(3)
+ val driverDescription = new DriverDescription(jarUrl, mainClass, mem)
+ val client = new SubmissionClient(actorSystem, master, driverDescription)
+
+ case "kill" =>
+ val master = args(1)
+ val driverId = args(2)
+ val client = new TerminationClient(actorSystem, master, driverId)
+ }
+ actorSystem.awaitTermination()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
new file mode 100644
index 0000000000..69d150aea5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.deploy.{DriverDescription, ApplicationDescription}
+import java.util.Date
+import akka.actor.ActorRef
+import scala.collection.mutable
+
+private[spark] class DriverInfo(
+ val startTime: Long,
+ val id: String,
+ val desc: DriverDescription,
+ val submitDate: Date)
+ extends Serializable {
+
+ @transient var state: DriverState.Value = DriverState.SUBMITTED
+ /* If we fail when launching the driver, the exception is stored here. */
+ @transient var exception: Option[Exception] = None
+ /* Most recent worker assigned to this driver */
+ @transient var worker: Option[WorkerInfo] = None
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
new file mode 100644
index 0000000000..230dab1a19
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object DriverState extends Enumeration {
+
+ type DriverState = Value
+
+ // SUBMITTED: Submitted but not yet scheduled on a worker
+ // RUNNING: Has been allocated to a worker to run
+ // FINISHED: Previously ran and exited cleanly
+ // RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again
+ // UNKNOWN: The state of the driver is temporarily not known due to master failure recovery
+ // KILLED: A user manually killed this driver
+ // FAILED: Unable to run due to an unrecoverable error (e.g. missing jar file)
+ val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED = Value
+
+ val MAX_NUM_RETRY = 10
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 043945a211..44a046b4a4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -47,6 +47,15 @@ private[spark] class FileSystemPersistenceEngine(
new File(dir + File.separator + "app_" + app.id).delete()
}
+ override def addDriver(driver: DriverInfo) {
+ val driverFile = new File(dir + File.separator + "driver_" + driver.id)
+ serializeIntoFile(driverFile, driver)
+ }
+
+ override def removeDriver(driver: DriverInfo) {
+ new File(dir + File.separator + "driver_" + driver.id).delete()
+ }
+
override def addWorker(worker: WorkerInfo) {
val workerFile = new File(dir + File.separator + "worker_" + worker.id)
serializeIntoFile(workerFile, worker)
@@ -56,13 +65,15 @@ private[spark] class FileSystemPersistenceEngine(
new File(dir + File.separator + "worker_" + worker.id).delete()
}
- override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+ override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+ val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
+ val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
- (apps, workers)
+ (apps, drivers, workers)
}
private def serializeIntoFile(file: File, value: AnyRef) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index eebd0794b8..76af332986 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
+import org.apache.spark.deploy.{DriverDescription, ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
@@ -47,7 +47,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
- var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
val actorToWorker = new HashMap[ActorRef, WorkerInfo]
@@ -57,9 +56,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val idToApp = new HashMap[String, ApplicationInfo]
val actorToApp = new HashMap[ActorRef, ApplicationInfo]
val addressToApp = new HashMap[Address, ApplicationInfo]
-
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
+ var nextAppNumber = 0
+
+ val drivers = new HashSet[DriverInfo]
+ val completedDrivers = new ArrayBuffer[DriverInfo]
+ val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling
+ var nextDriverNumber = 0
Utils.checkHost(host, "Expected hostname")
@@ -134,14 +138,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def receive = {
case ElectedLeader => {
- val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
- state = if (storedApps.isEmpty && storedWorkers.isEmpty)
+ val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
+ state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty)
RecoveryState.ALIVE
else
RecoveryState.RECOVERING
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
- beginRecovery(storedApps, storedWorkers)
+ beginRecovery(storedApps, storedDrivers, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
}
}
@@ -168,6 +172,52 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
+ case RequestSubmitDriver(description) => {
+ if (state == RecoveryState.STANDBY) {
+ sender ! SubmitDriverResponse(false, "Standby master cannot accept driver submission")
+ } else {
+ logInfo("Driver submitted " + description.mainClass)
+ val driver = createDriver(description)
+ persistenceEngine.addDriver(driver)
+ waitingDrivers += driver
+ drivers.add(driver)
+ schedule()
+
+ // TODO: It might be good to instead have the submission client poll the master to determine
+ // the current status of the driver. Since we may already want to expose this.
+
+ sender ! SubmitDriverResponse(true, "Driver successfully submitted")
+ }
+ }
+
+ case RequestKillDriver(driverId) => {
+ if (state == RecoveryState.STANDBY) {
+ sender ! KillDriverResponse(false, "Standby master cannot kill drivers")
+ } else {
+ logInfo("Asked to kill driver " + driverId)
+ val driver = drivers.find(_.id == driverId)
+ driver match {
+ case Some(d) =>
+ if (waitingDrivers.contains(d)) { waitingDrivers -= d }
+ else {
+ // We just notify the worker to kill the driver here. The final bookkeeping occurs
+ // on the return path when the worker submits a state change back to the master
+ // to notify it that the driver was successfully killed.
+ d.worker.foreach { w =>
+ w.actor ! KillDriver(driverId)
+ }
+ }
+ val msg = s"Kill request for $driverId submitted"
+ logInfo(msg)
+ sender ! KillDriverResponse(true, msg)
+ case None =>
+ val msg = s"Could not find running driver $driverId"
+ logWarning(msg)
+ sender ! KillDriverResponse(false, msg)
+ }
+ }
+ }
+
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
@@ -210,6 +260,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
+ case DriverStateChanged(driverId, state, exception) => {
+ if (!(state == DriverState.FAILED || state == DriverState.FINISHED ||
+ state == DriverState.KILLED)) {
+ throw new Exception(s"Received unexpected state update for driver $driverId: $state")
+ }
+ drivers.find(_.id == driverId) match {
+ case Some(driver) => {
+ drivers -= driver
+ completedDrivers += driver
+ persistenceEngine.removeDriver(driver)
+ driver.state = state
+ driver.exception = exception
+ driver.worker.foreach(w => w.removeDriver(driver))
+ }
+ case None =>
+ logWarning(s"Got driver update for unknown driver $driverId")
+ }
+ }
+
case Heartbeat(workerId) => {
idToWorker.get(workerId) match {
case Some(workerInfo) =>
@@ -231,7 +300,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (canCompleteRecovery) { completeRecovery() }
}
- case WorkerSchedulerStateResponse(workerId, executors) => {
+ case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
@@ -244,6 +313,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
+
+ for (driverId <- driverIds) {
+ drivers.find(_.id == driverId).foreach { driver =>
+ driver.worker = Some(worker)
+ driver.state = DriverState.RUNNING
+ worker.drivers(driverId) = driver
+ }
+ }
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
@@ -260,8 +337,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestMasterState => {
- sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
- state)
+ sender ! MasterStateResponse(host, port, workers.toArray, drivers.toArray,
+ completedDrivers.toArray ,apps.toArray, completedApps.toArray, state)
}
case CheckForWorkerTimeOut => {
@@ -277,7 +354,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
apps.count(_.state == ApplicationState.UNKNOWN) == 0
- def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) {
+ def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
+ storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
@@ -289,6 +367,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
+ for (driver <- storedDrivers) {
+ // Here we just read in the list of drivers. Any drivers associated with now-lost workers
+ // will be re-launched when we detect that the worker is missing.
+ drivers += driver
+ }
+
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
@@ -312,6 +396,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
+ // Reschedule drivers which were not claimed by any workers
+ drivers.filter(_.worker.isEmpty).foreach { d =>
+ logWarning(s"Driver ${d.id} was not found after master recovery, re-launching")
+ relaunchDriver(d)
+ }
+
state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
@@ -332,6 +422,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
*/
def schedule() {
if (state != RecoveryState.ALIVE) { return }
+ // First schedule drivers, they take strict precedence over applications
+ for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
+ for (driver <- Seq(waitingDrivers: _*)) {
+ if (worker.memoryFree > driver.desc.mem) {
+ launchDriver(worker, driver)
+ waitingDrivers -= driver
+ }
+ }
+ }
+
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
@@ -418,9 +518,19 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
+ for (driver <- worker.drivers.values) {
+ relaunchDriver(driver)
+ }
persistenceEngine.removeWorker(worker)
}
+ def relaunchDriver(driver: DriverInfo) {
+ driver.worker = None
+ driver.state = DriverState.RELAUNCHING
+ waitingDrivers += driver
+ schedule()
+ }
+
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
@@ -499,6 +609,28 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
}
+
+ /** Generate a new driver ID given a driver's submission date */
+ def newDriverId(submitDate: Date): String = {
+ val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
+ nextDriverNumber += 1
+ appId
+ }
+
+ def createDriver(desc: DriverDescription): DriverInfo = {
+ val now = System.currentTimeMillis()
+ val date = new Date(now)
+ new DriverInfo(now, newDriverId(date), desc, date)
+ }
+
+ def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
+ logInfo("Launching driver " + driver.id + " on worker " + worker.id)
+ worker.addDriver(driver)
+ driver.worker = Some(worker)
+ worker.actor ! LaunchDriver(driver.id, driver.desc.jarUrl, driver.desc.mainClass,
+ driver.desc.mem)
+ driver.state = DriverState.RUNNING
+ }
}
private[spark] object Master {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
index 94b986caf2..e3640ea4f7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -35,11 +35,15 @@ private[spark] trait PersistenceEngine {
def removeWorker(worker: WorkerInfo)
+ def addDriver(driver: DriverInfo)
+
+ def removeDriver(driver: DriverInfo)
+
/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
- def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo])
+ def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo])
def close() {}
}
@@ -49,5 +53,8 @@ private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
override def removeApplication(app: ApplicationInfo) {}
override def addWorker(worker: WorkerInfo) {}
override def removeWorker(worker: WorkerInfo) {}
- override def readPersistedData() = (Nil, Nil)
+ override def addDriver(driver: DriverInfo) {}
+ override def removeDriver(driver: DriverInfo) {}
+
+ override def readPersistedData() = (Nil, Nil, Nil)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index e05f587b58..27c2ff4b8c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -36,6 +36,7 @@ private[spark] class WorkerInfo(
assert (port > 0)
@transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info
+ @transient var drivers: mutable.HashMap[String, DriverInfo] = _
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
@transient var memoryUsed: Int = _
@@ -54,6 +55,7 @@ private[spark] class WorkerInfo(
private def init() {
executors = new mutable.HashMap
+ drivers = new mutable.HashMap
state = WorkerState.ALIVE
coresUsed = 0
memoryUsed = 0
@@ -83,6 +85,18 @@ private[spark] class WorkerInfo(
executors.values.exists(_.application == app)
}
+ def addDriver(driver: DriverInfo) {
+ drivers(driver.id) = driver
+ memoryUsed += driver.desc.mem
+ coresUsed += 1
+ }
+
+ def removeDriver(driver: DriverInfo) {
+ drivers -= driver.id
+ memoryUsed -= driver.desc.mem
+ coresUsed -= 1
+ }
+
def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 825344b3bb..52df173850 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -49,6 +49,14 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
zk.delete(WORKING_DIR + "/app_" + app.id)
}
+ override def addDriver(driver: DriverInfo) {
+ serializeIntoFile(WORKING_DIR + "/driver_" + driver.id, driver)
+ }
+
+ override def removeDriver(driver: DriverInfo) {
+ zk.delete(WORKING_DIR + "/driver_" + driver.id)
+ }
+
override def addWorker(worker: WorkerInfo) {
serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
}
@@ -61,13 +69,15 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
zk.close()
}
- override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+ override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
val appFiles = sortedFiles.filter(_.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+ val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
+ val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
- (apps, workers)
+ (apps, drivers, workers)
}
private def serializeIntoFile(path: String, value: AnyRef) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 4ef762892c..13903b4a1d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -26,7 +26,8 @@ import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.master.{DriverInfo, ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
@@ -56,6 +57,12 @@ private[spark] class IndexPage(parent: MasterWebUI) {
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
+ val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Memory", "Main Class")
+ val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
+ val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
+ val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
+ val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers)
+
val content =
<div class="row-fluid">
<div class="span12">
@@ -70,6 +77,9 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<li><strong>Applications:</strong>
{state.activeApps.size} Running,
{state.completedApps.size} Completed </li>
+ <li><strong>Drivers:</strong>
+ {state.activeDrivers.size} Running,
+ {state.completedDrivers.size} Completed </li>
</ul>
</div>
</div>
@@ -94,7 +104,22 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<h4> Completed Applications </h4>
{completedAppsTable}
</div>
- </div>;
+ </div>
+
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Active Drivers </h4>
+
+ {activeDriversTable}
+ </div>
+ </div>
+
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Completed Drivers </h4>
+ {completedDriversTable}
+ </div>
+ </div>;
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}
@@ -134,4 +159,17 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td>{DeployWebUI.formatDuration(app.duration)}</td>
</tr>
}
+
+ def driverRow(driver: DriverInfo): Seq[Node] = {
+ <tr>
+ <td>{driver.id} </td>
+ <td>{driver.submitDate}</td>
+ <td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td>
+ <td>{driver.state}</td>
+ <td sorttable_customkey={driver.desc.mem.toString}>
+ {Utils.megabytesToString(driver.desc.mem.toLong)}
+ </td>
+ <td>{driver.desc.mainClass}</td>
+ </tr>
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
new file mode 100644
index 0000000000..fccc36b660
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import java.io._
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import org.apache.hadoop.fs.{FileUtil, Path}
+import org.apache.hadoop.conf.Configuration
+import akka.actor.{ActorRef, ActorSelection}
+import org.apache.spark.deploy.DeployMessages.DriverStateChanged
+import org.apache.spark.deploy.master.DriverState
+
+/**
+ * Manages the execution of one driver process.
+ */
+private[spark] class DriverRunner(
+ val driverId: String,
+ val jarUrl: String,
+ val mainClass: String,
+ val workDir: File,
+ val memory: Int,
+ val worker: ActorRef)
+ extends Logging {
+
+ var process: Option[Process] = None
+ @volatile var killed = false
+
+ /** Starts a thread to run and manage the driver. */
+ def start() = {
+ new Thread("DriverRunner for " + driverId) {
+ override def run() {
+ var exn: Option[Exception] = None
+
+ try {
+ val driverDir = createWorkingDirectory()
+ val localJarFilename = downloadUserJar(driverDir)
+ val command = Seq("java", "-cp", localJarFilename, mainClass)
+ runCommandWithRetry(command, driverDir)
+ }
+ catch {
+ case e: Exception => exn = Some(e)
+ }
+
+ val finalState =
+ if (killed) { DriverState.KILLED }
+ else if (exn.isDefined) { DriverState.FAILED }
+ else { DriverState.FINISHED }
+
+ worker ! DriverStateChanged(driverId, finalState, exn)
+ }
+ }.start()
+ }
+
+ /** Terminate this driver (or prevent it from ever starting if not yet started) */
+ def kill() {
+ killed = true
+ process.foreach(p => p.destroy())
+ }
+
+ /** Spawn a thread that will redirect a given stream to a file */
+ def redirectStream(in: InputStream, file: File) {
+ val out = new FileOutputStream(file, true)
+ new Thread("redirect output to " + file) {
+ override def run() {
+ try {
+ Utils.copyStream(in, out, true)
+ } catch {
+ case e: IOException =>
+ logInfo("Redirection to " + file + " closed: " + e.getMessage)
+ }
+ }
+ }.start()
+ }
+
+ /**
+ * Creates the working directory for this driver.
+ * Will throw an exception if there are errors preparing the directory.
+ */
+ def createWorkingDirectory(): File = {
+ val driverDir = new File(workDir, driverId)
+ if (!driverDir.exists() && !driverDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + driverDir)
+ }
+ driverDir
+ }
+
+ /**
+ * Download the user jar into the supplied directory and return its local path.
+ * Will throw an exception if there are errors downloading the jar.
+ */
+ def downloadUserJar(driverDir: File): String = {
+
+ val jarPath = new Path(jarUrl)
+
+ val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
+ val jarFileSystem = jarPath.getFileSystem(emptyConf)
+
+ val destPath = new Path(driverDir.getAbsolutePath())
+ val destFileSystem = destPath.getFileSystem(emptyConf)
+ val jarFileName = jarPath.getName
+ val localJarFile = new File(driverDir, jarFileName)
+ val localJarFilename = localJarFile.getAbsolutePath
+
+ if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
+ logInfo(s"Copying user jar $jarPath to $destPath")
+ FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
+ }
+
+ if (!localJarFile.exists()) { // Verify copy succeeded
+ throw new Exception(s"Did not see expected jar $jarFileName in $driverDir")
+ }
+
+ localJarFilename
+ }
+
+ /** Continue launching the supplied command until it exits zero. */
+ def runCommandWithRetry(command: Seq[String], baseDir: File) = {
+ /* Time to wait between submission retries. */
+ var waitSeconds = 1
+ // TODO: We should distinguish between "immediate" exits and cases where it was running
+ // for a long time and then exits.
+ var cleanExit = false
+
+ while (!cleanExit && !killed) {
+ Thread.sleep(waitSeconds * 1000)
+ val builder = new ProcessBuilder(command: _*).directory(baseDir)
+ logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+
+ process = Some(builder.start())
+
+ // Redirect stdout and stderr to files
+ val stdout = new File(baseDir, "stdout")
+ redirectStream(process.get.getInputStream, stdout)
+
+ val stderr = new File(baseDir, "stderr")
+ val header = "Driver Command: %s\n%s\n\n".format(
+ command.mkString("\"", "\" \"", "\""), "=" * 40)
+ Files.write(header, stderr, Charsets.UTF_8)
+ redirectStream(process.get.getErrorStream, stderr)
+
+
+ val exitCode =
+ /* There is a race here I've elected to ignore for now because it's very unlikely and not
+ * simple to fix. This could see `killed=false` then the main thread gets a kill request
+ * and sets `killed=true` and destroys the not-yet-started process, then this thread
+ * launches the process. For now, in that case the user can just re-submit the kill
+ * request. */
+ if (killed) -1
+ else process.get.waitFor()
+
+ cleanExit = exitCode == 0
+ if (!cleanExit && !killed) {
+ waitSeconds = waitSeconds * 2 // exponential back-off
+ logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 87531b6719..a2b491a72f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -30,18 +30,10 @@ import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.master.Master
+import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -83,6 +75,9 @@ private[spark] class Worker(
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
+ val drivers = new HashMap[String, DriverRunner]
+ val finishedDrivers = new HashMap[String, DriverRunner]
+
val publicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
@@ -193,7 +188,7 @@ private[spark] class Worker(
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
- sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+ sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
case RegisterWorkerFailed(message) =>
if (!registered) {
@@ -247,13 +242,56 @@ private[spark] class Worker(
}
}
+ case LaunchDriver(driverId, jarUrl, mainClass, memory) => {
+ logInfo(s"Asked to launch driver $driverId")
+ val driver = new DriverRunner(driverId, jarUrl, mainClass, workDir, memory, self)
+ drivers(driverId) = driver
+ driver.start()
+
+ coresUsed += 1
+ memoryUsed += memory
+ }
+
+ case KillDriver(driverId) => {
+ logInfo(s"Asked to kill driver $driverId")
+
+ drivers.find(_._1 == driverId) match {
+ case Some((id, runner)) =>
+ runner.kill()
+ case None =>
+ logError(s"Asked to kill unknown driver $driverId")
+ }
+
+ }
+
+
+ case DriverStateChanged(driverId, state, exception) => {
+ state match {
+ case DriverState.FAILED =>
+ logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
+ case DriverState.FINISHED =>
+ logInfo(s"Driver $driverId exited successfully")
+ case DriverState.KILLED =>
+ logInfo(s"Driver $driverId was killed")
+ }
+ masterLock.synchronized {
+ master ! DriverStateChanged(driverId, state, exception)
+ }
+ val driver = drivers(driverId)
+ memoryUsed -= driver.memory
+ coresUsed -= 1
+ drivers -= driverId
+ finishedDrivers(driverId) = driver
+ }
+
case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
logInfo(s"$x Disassociated !")
masterDisconnected()
case RequestWorkerState => {
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
- finishedExecutors.values.toList, activeMasterUrl, cores, memory,
+ finishedExecutors.values.toList, drivers.values.toList,
+ finishedDrivers.values.toList, activeMasterUrl, cores, memory,
coresUsed, memoryUsed, activeMasterWebUiUrl)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 0d59048313..e233b82585 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -30,7 +30,7 @@ import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
-import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
@@ -56,6 +56,12 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
val finishedExecutorTable =
UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
+ val driverHeaders = Seq("DriverID", "Main Class", "Memory", "Logs")
+ val runningDriverTable =
+ UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
+ def finishedDriverTable =
+ UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers)
+
val content =
<div class="row-fluid"> <!-- Worker Details -->
<div class="span12">
@@ -84,6 +90,20 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<h4> Finished Executors </h4>
{finishedExecutorTable}
</div>
+ </div>
+
+ <div class="row-fluid"> <!-- Running Drivers -->
+ <div class="span12">
+ <h4> Running Drivers {workerState.drivers.size} </h4>
+ {runningDriverTable}
+ </div>
+ </div>
+
+ <div class="row-fluid"> <!-- Finished Drivers -->
+ <div class="span12">
+ <h4> Finished Drivers </h4>
+ {finishedDriverTable}
+ </div>
</div>;
UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
@@ -111,6 +131,20 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
.format(executor.appId, executor.execId)}>stderr</a>
</td>
</tr>
+
}
+ def driverRow(driver: DriverRunner): Seq[Node] = {
+ <tr>
+ <td>{driver.driverId}</td>
+ <td>{driver.mainClass}</td>
+ <td sorttable_customkey={driver.memory.toString}>
+ {Utils.megabytesToString(driver.memory)}
+ </td>
+ <td>
+ <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>
+ <a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a>
+ </td>
+ </tr>
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 40d6bdb3fd..d128e58797 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -69,30 +69,44 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
def log(request: HttpServletRequest): String = {
val defaultBytes = 100 * 1024
- val appId = request.getParameter("appId")
- val executorId = request.getParameter("executorId")
+
+ val appId = Option(request.getParameter("appId"))
+ val executorId = Option(request.getParameter("executorId"))
+ val driverId = Option(request.getParameter("driverId"))
val logType = request.getParameter("logType")
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+
+ val path = (appId, executorId, driverId) match {
+ case (Some(a), Some(e), None) =>
+ s"${workDir.getPath}/$appId/$executorId/$logType"
+ case (None, None, Some(d)) =>
+ s"${workDir.getPath}/$driverId/$logType"
+ }
val (startByte, endByte) = getByteRange(path, offset, byteLength)
val file = new File(path)
val logLength = file.length
- val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n"
- .format(startByte, endByte, logLength, appId, executorId, logType)
+ val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
pre + Utils.offsetBytes(path, startByte, endByte)
}
def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
val defaultBytes = 100 * 1024
- val appId = request.getParameter("appId")
- val executorId = request.getParameter("executorId")
+ val appId = Option(request.getParameter("appId"))
+ val executorId = Option(request.getParameter("executorId"))
+ val driverId = Option(request.getParameter("driverId"))
val logType = request.getParameter("logType")
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+
+ val (path, params) = (appId, executorId, driverId) match {
+ case (Some(a), Some(e), None) =>
+ (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
+ case (None, None, Some(d)) =>
+ (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+ }
val (startByte, endByte) = getByteRange(path, offset, byteLength)
val file = new File(path)
@@ -106,9 +120,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val backButton =
if (startByte > 0) {
- <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
- .format(appId, executorId, logType, math.max(startByte-byteLength, 0),
- byteLength)}>
+ <a href={"?%s&logType=%s&offset=%s&byteLength=%s"
+ .format(params, logType, math.max(startByte-byteLength, 0), byteLength)}>
<button type="button" class="btn btn-default">
Previous {Utils.bytesToString(math.min(byteLength, startByte))}
</button>
@@ -122,8 +135,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val nextButton =
if (endByte < logLength) {
- <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
- format(appId, executorId, logType, endByte, byteLength)}>
+ <a href={"?%s&logType=%s&offset=%s&byteLength=%s".
+ format(params, logType, endByte, byteLength)}>
<button type="button" class="btn btn-default">
Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
</button>