aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala10
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala9
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala8
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala9
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala5
10 files changed, 47 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index fc36e37c53..72716567ca 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -111,6 +111,9 @@ object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _
+ private[spark] val driverActorSystemName = "sparkDriver"
+ private[spark] val executorActorSystemName = "sparkExecutor"
+
def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
@@ -146,9 +149,9 @@ object SparkEnv extends Logging {
}
val securityManager = new SecurityManager(conf)
-
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
- securityManager = securityManager)
+ val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ actorSystemName, hostname, port, conf, securityManager)
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index d99c76117c..4f7133c4bc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.{Logging, SparkContext, SparkEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class SimrSchedulerBackend(
@@ -38,8 +38,10 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ sc.conf.get("spark.driver.host"),
+ sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val conf = new Configuration()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 589dba2e40..32138e5246 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
@@ -42,8 +42,10 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- conf.get("spark.driver.host"), conf.get("spark.driver.port"),
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 9f45400bcf..f0172504c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -130,7 +130,8 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index d6afb73b74..e2d32c859b 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -27,7 +27,7 @@ import akka.pattern.ask
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
-import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
/**
* Various utility classes for working with Akka.
@@ -192,10 +192,11 @@ private[spark] object AkkaUtils extends Logging {
}
def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
+ val driverActorSystemName = SparkEnv.driverActorSystemName
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
- val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
+ val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index d934b9cbfc..53a3e6200e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -20,22 +20,21 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await
import akka.actor.{Actor, Props}
import akka.pattern.ask
+import com.google.common.base.Throwables
+
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler.DeregisterReceiver
import org.apache.spark.streaming.scheduler.AddBlock
-import scala.Some
import org.apache.spark.streaming.scheduler.RegisterReceiver
-import com.google.common.base.Throwables
/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -56,7 +55,8 @@ private[streaming] class ReceiverSupervisorImpl(
private val trackerActor = {
val ip = env.conf.get("spark.driver.host", "localhost")
val port = env.conf.getInt("spark.driver.port", 7077)
- val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port)
+ val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format(
+ SparkEnv.driverActorSystemName, ip, port)
env.actorSystem.actorSelection(url)
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index c3310fbc24..155dd88aa2 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
@@ -210,8 +210,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ driverHost,
+ driverPort.toString,
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 80e0162e9f..568a6ef932 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -26,7 +26,7 @@ import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@@ -245,8 +245,10 @@ private[yarn] class YarnAllocationHandler(
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (executorIdCounter)
val executorId = executorIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ sparkConf.get("spark.driver.host"),
+ sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("launching container on " + containerId + " host " + executorHostname)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 45925f1fea..e093fe4ae6 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import akka.actor._
import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
@@ -174,8 +174,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ driverHost,
+ driverPort.toString,
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 29ccec2adc..0a461749c8 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -26,7 +26,7 @@ import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@@ -262,7 +262,8 @@ private[yarn] class YarnAllocationHandler(
numExecutorsRunning.decrementAndGet()
} else {
val executorId = executorIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)