diff options
3 files changed, 8 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index e319e75bae..1cd5d99bcf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -23,7 +23,7 @@ import scala.concurrent._ import akka.actor._ -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.deploy.{Command, DriverDescription} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -59,11 +59,12 @@ object DriverClient extends Logging { def main(args: Array[String]) { val driverArgs = new DriverClientArguments(args) + val conf = new SparkConf() // TODO: See if we can initialize akka so return messages are sent back using the same TCP // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, _) = AkkaUtils.createActorSystem( - "driverClient", Utils.localHostName(), 0) + "driverClient", Utils.localHostName(), 0, false, conf) val master = driverArgs.master val response = promise[(Boolean, String)] val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response))) @@ -95,7 +96,7 @@ object DriverClient extends Logging { val (success, message) = try { - Await.result(response.future, AkkaUtils.askTimeout) + Await.result(response.future, AkkaUtils.askTimeout(conf)) } catch { case e: TimeoutException => (false, s"Master $master failed to respond in time") } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 2deb21aac6..1640d5fee0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -2,6 +2,7 @@ package org.apache.spark.deploy.worker import akka.actor._ +import org.apache.spark.SparkConf import org.apache.spark.util.{AkkaUtils, Utils} /** @@ -12,7 +13,7 @@ object DriverWrapper { args.toList match { case workerUrl :: mainClass :: extraArgs => val (actorSystem, _) = AkkaUtils.createActorSystem("Driver", - Utils.localHostName(), 0) + Utils.localHostName(), 0, false, new SparkConf()) actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher") // Delegate to supplied main class @@ -20,7 +21,7 @@ object DriverWrapper { val mainMethod = clazz.getMethod("main", classOf[Array[String]]) mainMethod.invoke(null, extraArgs.toArray[String]) - actorSystem.awaitTermination() + actorSystem.shutdown() case _ => System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 2072f00fae..4546e3892f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -335,7 +335,7 @@ private[spark] object Worker { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, - masterUrls, workDir, conf), name = actorName) + masterUrls, systemName, actorName, workDir, conf), name = actorName) (actorSystem, boundPort) } |