diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-12-03 11:27:45 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-12-03 11:27:45 +0530 |
commit | 09e8be9a6225203337a01e618851e807a1482603 (patch) | |
tree | 0c0762e8254bb62266ad37e86dc0963d72bca0e4 /core | |
parent | 0f24576c08a361f323b7ad9babfd5d8431d57df0 (diff) | |
download | spark-09e8be9a6225203337a01e618851e807a1482603.tar.gz spark-09e8be9a6225203337a01e618851e807a1482603.tar.bz2 spark-09e8be9a6225203337a01e618851e807a1482603.zip |
Made running SparkActorSystem specific to executors only.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 11 |
2 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index dcb12bed4e..406e015f08 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -97,7 +97,8 @@ private[spark] object CoarseGrainedExecutorBackend { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, + useSparkAS = true) // set it val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 407e9ffe90..f3e2644a58 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -35,7 +35,9 @@ private[spark] object AkkaUtils { * Note: the `name` parameter is important, as even if a client sends a message to right * host + port, if the system name is incorrect, Akka will drop the message. */ - def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { + def createActorSystem(name: String, host: String, port: Int, + useSparkAS: Boolean = false): (ActorSystem, Int) = { + val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt @@ -70,7 +72,12 @@ private[spark] object AkkaUtils { |akka.remote.log-remote-lifecycle-events = $lifecycleEvents """.stripMargin) - val actorSystem = SparkActorSystem(name, akkaConf) + val actorSystem = if (useSparkAS) { + SparkActorSystem(name, akkaConf) + } + else { + ActorSystem(name, akkaConf) + } val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get |