aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-08 16:53:04 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-08 16:56:26 -0800
commit0f9d2ace6baefeacb1abf9d51a457644b67f2f8d (patch)
treef01c8fbfe47e05304a7f799f9c5fc1e4d04c407f /core
parent62b08faac5278d289bdaefb42ff6f65b62ce48aa (diff)
downloadspark-0f9d2ace6baefeacb1abf9d51a457644b67f2f8d.tar.gz
spark-0f9d2ace6baefeacb1abf9d51a457644b67f2f8d.tar.bz2
spark-0f9d2ace6baefeacb1abf9d51a457644b67f2f8d.zip
Adding polling to driver submission client.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala141
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
6 files changed, 132 insertions, 68 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 43b9b1cff9..e133893f6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -22,60 +22,30 @@ import scala.collection.mutable.Map
import scala.concurrent._
import akka.actor._
+import akka.pattern.ask
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.master.Master
+import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{AkkaUtils, Utils}
+import akka.actor.Actor.emptyBehavior
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
/**
- * Actor that sends a single message to the standalone master and returns the response in the
- * given promise.
+ * Proxy that relays messages to the driver.
*/
-class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
- override def receive = {
- case SubmitDriverResponse(success, message) => {
- response.success((success, message))
- }
-
- case KillDriverResponse(success, message) => {
- response.success((success, message))
- }
+class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
+ var masterActor: ActorSelection = _
+ val timeout = AkkaUtils.askTimeout(conf)
- // Relay all other messages to the master.
- case message => {
- logInfo(s"Sending message to master $master...")
- val masterActor = context.actorSelection(Master.toAkkaUrl(master))
- masterActor ! message
- }
- }
-}
+ override def preStart() = {
+ masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
-/**
- * Executable utility for starting and terminating drivers inside of a standalone cluster.
- */
-object Client {
-
- def main(args: Array[String]) {
- val driverArgs = new ClientArguments(args)
- val conf = new SparkConf()
-
- if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
- conf.set("spark.akka.logLifecycleEvents", "true")
- }
- conf.set("spark.akka.askTimeout", "5")
- Logger.getRootLogger.setLevel(driverArgs.logLevel)
-
- // TODO: See if we can initialize akka so return messages are sent back using the same TCP
- // flow. Else, this (sadly) requires the DriverClient be routable from the Master.
- val (actorSystem, _) = AkkaUtils.createActorSystem(
- "driverClient", Utils.localHostName(), 0, false, conf)
- val master = driverArgs.master
- val response = promise[(Boolean, String)]
- val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
+
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
@@ -94,21 +64,88 @@ object Client {
driverArgs.cores,
driverArgs.supervise,
command)
- driver ! RequestSubmitDriver(driverDescription)
+
+ masterActor ! RequestSubmitDriver(driverDescription)
case "kill" =>
val driverId = driverArgs.driverId
- driver ! RequestKillDriver(driverId)
+ val killFuture = masterActor ! RequestKillDriver(driverId)
+ }
+ }
+
+ /* Find out driver status then exit the JVM */
+ def pollAndReportStatus(driverId: String) {
+ println(s"... waiting before polling master for driver state")
+ Thread.sleep(5000)
+ println("... polling master for driver state")
+ val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
+ .mapTo[DriverStatusResponse]
+ val statusResponse = Await.result(statusFuture, timeout)
+
+ statusResponse.found match {
+ case false =>
+ println(s"ERROR: Cluster master did not recognize $driverId")
+ System.exit(-1)
+ case true =>
+ println(s"State of $driverId is ${statusResponse.state.get}")
+ // Worker node, if present
+ (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
+ case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
+ println(s"Driver running on $hostPort ($id)")
+ case _ =>
+ }
+ // Exception, if present
+ statusResponse.exception.map { e =>
+ println(s"Exception from cluster was: $e")
+ System.exit(-1)
+ }
+ System.exit(0)
}
+ }
+
+ override def receive = {
+
+ case SubmitDriverResponse(success, driverId, message) =>
+ println(message)
+ if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
+
+ case KillDriverResponse(driverId, success, message) =>
+ println(message)
+ if (success) pollAndReportStatus(driverId) else System.exit(-1)
+
+ case DisassociatedEvent(_, remoteAddress, _) =>
+ println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
+ System.exit(-1)
+
+ case AssociationErrorEvent(cause, _, remoteAddress, _) =>
+ println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
+ println(s"Cause was: $cause")
+ System.exit(-1)
+ }
+}
+
+/**
+ * Executable utility for starting and terminating drivers inside of a standalone cluster.
+ */
+object Client {
+ def main(args: Array[String]) {
+ val conf = new SparkConf()
+ val driverArgs = new ClientArguments(args)
+
+ if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
+ conf.set("spark.akka.logLifecycleEvents", "true")
+ }
+ conf.set("spark.akka.askTimeout", "10")
+ conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
+ Logger.getRootLogger.setLevel(driverArgs.logLevel)
+
+ // TODO: See if we can initialize akka so return messages are sent back using the same TCP
+ // flow. Else, this (sadly) requires the DriverClient be routable from the Master.
+ val (actorSystem, _) = AkkaUtils.createActorSystem(
+ "driverClient", Utils.localHostName(), 0, false, conf)
+
+ actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
- val (success, message) =
- try {
- Await.result(response.future, AkkaUtils.askTimeout(conf))
- } catch {
- case e: TimeoutException => (false, s"Error: Timed out sending message to $master")
- }
- println(message)
- actorSystem.shutdown()
actorSystem.awaitTermination()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 34460d359d..5e824e1a67 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -112,11 +112,18 @@ private[deploy] object DeployMessages {
case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
- case class SubmitDriverResponse(success: Boolean, message: String) extends DeployMessage
+ case class SubmitDriverResponse(success: Boolean, driverId: Option[String], message: String)
+ extends DeployMessage
case class RequestKillDriver(driverId: String) extends DeployMessage
- case class KillDriverResponse(success: Boolean, message: String) extends DeployMessage
+ case class KillDriverResponse(driverId: String, success: Boolean, message: String)
+ extends DeployMessage
+
+ case class RequestDriverStatus(driverId: String) extends DeployMessage
+
+ case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
+ workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])
// Internal message in AppClient
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
index 93b260740e..26a68bade3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
@@ -27,6 +27,7 @@ private[spark] object DriverState extends Enumeration {
// RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again
// UNKNOWN: The state of the driver is temporarily not known due to master failure recovery
// KILLED: A user manually killed this driver
- // FAILED: Unable to run due to an unrecoverable error (e.g. missing jar file)
- val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED = Value
+ // FAILED: The driver exited non-zero and was not supervised
+ // ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file)
+ val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED, ERROR = Value
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f62601fa6c..cd3f3ebefc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -186,7 +186,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
- sender ! SubmitDriverResponse(false, msg)
+ sender ! SubmitDriverResponse(false, None, msg)
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
@@ -198,14 +198,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
- sender ! SubmitDriverResponse(true, s"Driver successfully submitted as ${driver.id}")
+ sender ! SubmitDriverResponse(true, Some(driver.id),
+ s"Driver successfully submitted as ${driver.id}")
}
}
case RequestKillDriver(driverId) => {
if (state != RecoveryState.ALIVE) {
val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
- sender ! KillDriverResponse(false, msg)
+ sender ! KillDriverResponse(driverId, success = false, msg)
} else {
logInfo("Asked to kill driver " + driverId)
val driver = drivers.find(_.id == driverId)
@@ -226,15 +227,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// TODO: It would be nice for this to be a synchronous response
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
- sender ! KillDriverResponse(true, msg)
+ sender ! KillDriverResponse(driverId, success = true, msg)
case None =>
- val msg = s"Could not find running driver $driverId"
+ val msg = s"Driver $driverId has already finished or does not exist"
logWarning(msg)
- sender ! KillDriverResponse(false, msg)
+ sender ! KillDriverResponse(driverId, success = false, msg)
}
}
}
+ case RequestDriverStatus(driverId) => {
+ (drivers ++ completedDrivers).find(_.id == driverId) match {
+ case Some(driver) =>
+ sender ! DriverStatusResponse(found = true, Some(driver.state),
+ driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
+ case None =>
+ sender ! DriverStatusResponse(found = false, None, None, None, None)
+ }
+ }
+
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
@@ -279,7 +290,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case DriverStateChanged(driverId, state, exception) => {
state match {
- case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED =>
+ case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
@@ -410,7 +421,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
- removeDriver(d.id, DriverState.FAILED, None)
+ removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
@@ -539,7 +550,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
- removeDriver(driver.id, DriverState.FAILED, None)
+ removeDriver(driver.id, DriverState.ERROR, None)
}
}
persistenceEngine.removeWorker(worker)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index ad70345a7f..b4df1a0dd4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -52,6 +52,7 @@ private[spark] class DriverRunner(
// Populated once finished
var finalState: Option[DriverState] = None
var finalException: Option[Exception] = None
+ var finalExitCode: Option[Int] = None
// Decoupled for testing
private[deploy] def setClock(_clock: Clock) = clock = _clock
@@ -87,8 +88,14 @@ private[spark] class DriverRunner(
val state =
if (killed) { DriverState.KILLED }
- else if (finalException.isDefined) { DriverState.FAILED }
- else { DriverState.FINISHED }
+ else if (finalException.isDefined) { DriverState.ERROR }
+ else {
+ finalExitCode match {
+ case Some(0) => DriverState.FINISHED
+ case _ => DriverState.FAILED
+ }
+ }
+
finalState = Some(state)
worker ! DriverStateChanged(driverId, state, finalException)
@@ -200,6 +207,7 @@ private[spark] class DriverRunner(
}
keepTrying = supervise && exitCode != 0 && !killed
+ finalExitCode = Some(exitCode)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2a2b7a3881..273bacded6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -272,7 +272,7 @@ private[spark] class Worker(
case DriverStateChanged(driverId, state, exception) => {
state match {
- case DriverState.FAILED =>
+ case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")