aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-26 12:11:52 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-26 12:11:52 -0800
commitda20270b839cc10d4459848f5b485ca566cd2dfb (patch)
treed1df785d7c26976a752a3c043dde08a7d2204bdb /core/src/main/scala/org/apache
parenta97ad55c45f5903c85c7c15b27177316473f2c0c (diff)
parent61372b11f4a4460b8ade8997d7478234bba64f7e (diff)
downloadspark-da20270b839cc10d4459848f5b485ca566cd2dfb.tar.gz
spark-da20270b839cc10d4459848f5b485ca566cd2dfb.tar.bz2
spark-da20270b839cc10d4459848f5b485ca566cd2dfb.zip
Merge pull request #1 from aarondav/driver
Refactor DriverClient to be more Actor-based
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala93
1 files changed, 31 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 28c851bcfe..d2f3c092fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -17,78 +17,35 @@
package org.apache.spark.deploy.client
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.Await
-import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.concurrent._
import akka.actor._
-import akka.actor.Actor.emptyBehavior
-import akka.pattern.ask
-import akka.remote.RemotingLifecycleEvent
import org.apache.spark.Logging
-import org.apache.spark.deploy.{DeployMessage, DriverDescription}
+import org.apache.spark.deploy.DriverDescription
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{AkkaUtils, Utils}
/**
- * Actor that sends a single message to the standalone master and then shuts down.
+ * Actor that sends a single message to the standalone master and returns the response in the
+ * given promise.
*/
-private[spark] abstract class SingleMessageClient(
- actorSystem: ActorSystem, master: String, message: DeployMessage)
- extends Logging {
-
- // Concrete child classes must implement
- def handleResponse(response: Any)
-
- var actor: ActorRef = actorSystem.actorOf(Props(new DriverActor()))
-
- class DriverActor extends Actor with Logging {
- override def preStart() {
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- logInfo("Sending message to master " + master + "...")
- val masterActor = context.actorSelection(Master.toAkkaUrl(master))
- val timeoutDuration: FiniteDuration = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
- val submitFuture = masterActor.ask(message)(timeoutDuration)
- handleResponse(Await.result(submitFuture, timeoutDuration))
- actorSystem.stop(actor)
- actorSystem.shutdown()
+class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
+ override def receive = {
+ case SubmitDriverResponse(success, message) => {
+ response.success((success, message))
}
- override def receive = emptyBehavior
- }
-}
-
-/**
- * Submits a driver to the master.
- */
-private[spark] class SubmissionClient(actorSystem: ActorSystem, master: String,
- driverDescription: DriverDescription)
- extends SingleMessageClient(actorSystem, master, RequestSubmitDriver(driverDescription)) {
-
- override def handleResponse(response: Any) {
- val resp = response.asInstanceOf[SubmitDriverResponse]
- if (!resp.success) {
- logError(s"Error submitting driver to $master")
- logError(resp.message)
+ case KillDriverResponse(success, message) => {
+ response.success((success, message))
}
- }
-}
-/**
- * Terminates a client at the master.
- */
-private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, driverId: String)
- extends SingleMessageClient(actorSystem, master, RequestKillDriver(driverId)) {
-
- override def handleResponse(response: Any) {
- val resp = response.asInstanceOf[KillDriverResponse]
- if (!resp.success) {
- logError(s"Error terminating $driverId at $master")
- logError(resp.message)
+ // Relay all other messages to the server.
+ case message => {
+ logInfo(s"Sending message to master $master...")
+ val masterActor = context.actorSelection(Master.toAkkaUrl(master))
+ masterActor ! message
}
}
}
@@ -96,7 +53,7 @@ private[spark] class TerminationClient(actorSystem: ActorSystem, master: String,
/**
* Executable utility for starting and terminating drivers inside of a standalone cluster.
*/
-object DriverClient {
+object DriverClient extends Logging {
def main(args: Array[String]) {
val driverArgs = new DriverClientArguments(args)
@@ -105,6 +62,9 @@ object DriverClient {
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0)
+ val master = driverArgs.master
+ val response = promise[(Boolean, String)]
+ val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
driverArgs.cmd match {
case "launch" =>
@@ -116,13 +76,22 @@ object DriverClient {
driverArgs.driverOptions,
driverArgs.driverJavaOptions,
driverArgs.driverEnvVars)
- val client = new SubmissionClient(actorSystem, driverArgs.master, driverDescription)
+ driver ! RequestSubmitDriver(driverDescription)
case "kill" =>
- val master = driverArgs.master
val driverId = driverArgs.driverId
- val client = new TerminationClient(actorSystem, master, driverId)
+ driver ! RequestKillDriver(driverId)
}
+
+ val (success, message) =
+ try {
+ Await.result(response.future, AkkaUtils.askTimeout)
+ } catch {
+ case e: TimeoutException => (false, s"Master $master failed to respond in time")
+ }
+ if (success) logInfo(message) else logError(message)
+ actorSystem.stop(driver)
+ actorSystem.shutdown()
actorSystem.awaitTermination()
}
}