aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
3 files changed, 8 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index e319e75bae..1cd5d99bcf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -23,7 +23,7 @@ import scala.concurrent._
import akka.actor._
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -59,11 +59,12 @@ object DriverClient extends Logging {
def main(args: Array[String]) {
val driverArgs = new DriverClientArguments(args)
+ val conf = new SparkConf()
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
- "driverClient", Utils.localHostName(), 0)
+ "driverClient", Utils.localHostName(), 0, false, conf)
val master = driverArgs.master
val response = promise[(Boolean, String)]
val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
@@ -95,7 +96,7 @@ object DriverClient extends Logging {
val (success, message) =
try {
- Await.result(response.future, AkkaUtils.askTimeout)
+ Await.result(response.future, AkkaUtils.askTimeout(conf))
} catch {
case e: TimeoutException => (false, s"Master $master failed to respond in time")
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 2deb21aac6..1640d5fee0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -2,6 +2,7 @@ package org.apache.spark.deploy.worker
import akka.actor._
+import org.apache.spark.SparkConf
import org.apache.spark.util.{AkkaUtils, Utils}
/**
@@ -12,7 +13,7 @@ object DriverWrapper {
args.toList match {
case workerUrl :: mainClass :: extraArgs =>
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
- Utils.localHostName(), 0)
+ Utils.localHostName(), 0, false, new SparkConf())
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
// Delegate to supplied main class
@@ -20,7 +21,7 @@ object DriverWrapper {
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
- actorSystem.awaitTermination()
+ actorSystem.shutdown()
case _ =>
System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2072f00fae..4546e3892f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -335,7 +335,7 @@ private[spark] object Worker {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir, conf), name = actorName)
+ masterUrls, systemName, actorName, workDir, conf), name = actorName)
(actorSystem, boundPort)
}