aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-08-25 23:36:09 -0700
committerAndrew Or <andrewor14@gmail.com>2014-08-25 23:36:09 -0700
commitb21ae5bbb9baa966f69303a30659aa8bbb2098da (patch)
tree93b422b0e63076694ad5f5accae4fc6598b2288c /core
parent52fbdc2deddcdba02bf5945a36e15870021ec890 (diff)
downloadspark-b21ae5bbb9baa966f69303a30659aa8bbb2098da.tar.gz
spark-b21ae5bbb9baa966f69303a30659aa8bbb2098da.tar.bz2
spark-b21ae5bbb9baa966f69303a30659aa8bbb2098da.zip
[SPARK-2886] Use more specific actor system name than "spark"
As of #1777 we log the name of the actor system when it binds to a port. The current name "spark" is super general and does not convey any meaning. For instance, the following line is taken from my driver log after setting `spark.driver.port` to 5001. ``` 14/08/13 19:33:29 INFO Remoting: Remoting started; listening on addresses: [akka.tcp://sparkandrews-mbp:5001] 14/08/13 19:33:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkandrews-mbp:5001] 14/08/06 13:40:05 INFO Utils: Successfully started service 'spark' on port 5001. ``` This commit renames this to "sparkDriver" and "sparkExecutor". The goal of this unambitious PR is simply to make the logged information more explicit without introducing any change in functionality. Author: Andrew Or <andrewor14@gmail.com> Closes #1810 from andrewor14/service-name and squashes the following commits: 8c459ed [Andrew Or] Use a common variable for driver/executor actor system names 3a92843 [Andrew Or] Change actor name to sparkDriver and sparkExecutor 921363e [Andrew Or] Merge branch 'master' of github.com:apache/spark into service-name c8c6a62 [Andrew Or] Do not include hyphens in actor name 1c1b42e [Andrew Or] Avoid spaces in akka system name f644b55 [Andrew Or] Use more specific service name
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala5
5 files changed, 22 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index fc36e37c53..72716567ca 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -111,6 +111,9 @@ object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _
+ private[spark] val driverActorSystemName = "sparkDriver"
+ private[spark] val executorActorSystemName = "sparkExecutor"
+
def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
@@ -146,9 +149,9 @@ object SparkEnv extends Logging {
}
val securityManager = new SecurityManager(conf)
-
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
- securityManager = securityManager)
+ val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ actorSystemName, hostname, port, conf, securityManager)
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index d99c76117c..4f7133c4bc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.{Logging, SparkContext, SparkEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class SimrSchedulerBackend(
@@ -38,8 +38,10 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ sc.conf.get("spark.driver.host"),
+ sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val conf = new Configuration()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 589dba2e40..32138e5246 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
@@ -42,8 +42,10 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- conf.get("spark.driver.host"), conf.get("spark.driver.port"),
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 9f45400bcf..f0172504c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -130,7 +130,8 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index d6afb73b74..e2d32c859b 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -27,7 +27,7 @@ import akka.pattern.ask
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
-import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
/**
* Various utility classes for working with Akka.
@@ -192,10 +192,11 @@ private[spark] object AkkaUtils extends Logging {
}
def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
+ val driverActorSystemName = SparkEnv.driverActorSystemName
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
- val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
+ val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)