diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-12-25 10:55:09 -0800 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-12-25 10:55:25 -0800 |
commit | 61372b11f4a4460b8ade8997d7478234bba64f7e (patch) | |
tree | f48fb1ff1b07c61b87a0451eec2b849fa5621a49 | |
parent | bbc362833b3bc34014a13be0592deca39cfd88bd (diff) | |
download | spark-61372b11f4a4460b8ade8997d7478234bba64f7e.tar.gz spark-61372b11f4a4460b8ade8997d7478234bba64f7e.tar.bz2 spark-61372b11f4a4460b8ade8997d7478234bba64f7e.zip |
Refactor DriverClient to be more Actor-based
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala | 93 |
1 files changed, 31 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 28c851bcfe..d2f3c092fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -17,78 +17,35 @@ package org.apache.spark.deploy.client -import java.util.concurrent.TimeUnit - -import scala.concurrent.Await -import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent._ import akka.actor._ -import akka.actor.Actor.emptyBehavior -import akka.pattern.ask -import akka.remote.RemotingLifecycleEvent import org.apache.spark.Logging -import org.apache.spark.deploy.{DeployMessage, DriverDescription} +import org.apache.spark.deploy.DriverDescription import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.util.{AkkaUtils, Utils} /** - * Actor that sends a single message to the standalone master and then shuts down. + * Actor that sends a single message to the standalone master and returns the response in the + * given promise. */ -private[spark] abstract class SingleMessageClient( - actorSystem: ActorSystem, master: String, message: DeployMessage) - extends Logging { - - // Concrete child classes must implement - def handleResponse(response: Any) - - var actor: ActorRef = actorSystem.actorOf(Props(new DriverActor())) - - class DriverActor extends Actor with Logging { - override def preStart() { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - logInfo("Sending message to master " + master + "...") - val masterActor = context.actorSelection(Master.toAkkaUrl(master)) - val timeoutDuration: FiniteDuration = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS) - val submitFuture = masterActor.ask(message)(timeoutDuration) - handleResponse(Await.result(submitFuture, timeoutDuration)) - actorSystem.stop(actor) - actorSystem.shutdown() +class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging { + override def receive = { + case SubmitDriverResponse(success, message) => { + response.success((success, message)) } - override def receive = emptyBehavior - } -} - -/** - * Submits a driver to the master. - */ -private[spark] class SubmissionClient(actorSystem: ActorSystem, master: String, - driverDescription: DriverDescription) - extends SingleMessageClient(actorSystem, master, RequestSubmitDriver(driverDescription)) { - - override def handleResponse(response: Any) { - val resp = response.asInstanceOf[SubmitDriverResponse] - if (!resp.success) { - logError(s"Error submitting driver to $master") - logError(resp.message) + case KillDriverResponse(success, message) => { + response.success((success, message)) } - } -} -/** - * Terminates a client at the master. - */ -private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, driverId: String) - extends SingleMessageClient(actorSystem, master, RequestKillDriver(driverId)) { - - override def handleResponse(response: Any) { - val resp = response.asInstanceOf[KillDriverResponse] - if (!resp.success) { - logError(s"Error terminating $driverId at $master") - logError(resp.message) + // Relay all other messages to the server. + case message => { + logInfo(s"Sending message to master $master...") + val masterActor = context.actorSelection(Master.toAkkaUrl(master)) + masterActor ! message } } } @@ -96,7 +53,7 @@ private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, /** * Executable utility for starting and terminating drivers inside of a standalone cluster. */ -object DriverClient { +object DriverClient extends Logging { def main(args: Array[String]) { val driverArgs = new DriverClientArguments(args) @@ -105,6 +62,9 @@ object DriverClient { // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, boundPort) = AkkaUtils.createActorSystem( "driverClient", Utils.localHostName(), 0) + val master = driverArgs.master + val response = promise[(Boolean, String)] + val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response))) driverArgs.cmd match { case "launch" => @@ -116,13 +76,22 @@ object DriverClient { driverArgs.driverOptions, driverArgs.driverJavaOptions, driverArgs.driverEnvVars) - val client = new SubmissionClient(actorSystem, driverArgs.master, driverDescription) + driver ! RequestSubmitDriver(driverDescription) case "kill" => - val master = driverArgs.master val driverId = driverArgs.driverId - val client = new TerminationClient(actorSystem, master, driverId) + driver ! RequestKillDriver(driverId) } + + val (success, message) = + try { + Await.result(response.future, AkkaUtils.askTimeout) + } catch { + case e: TimeoutException => (false, s"Master $master failed to respond in time") + } + if (success) logInfo(message) else logError(message) + actorSystem.stop(driver) + actorSystem.shutdown() actorSystem.awaitTermination() } } |