aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-26 15:28:49 -0700
committerAndrew Or <andrew@databricks.com>2015-05-26 15:28:49 -0700
commit9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1 (patch)
treedf4190414602fead033a5aa3b1e17c2821a1cda5 /yarn
parent2e9a5f229e1a2ccffa74fa59fa6a55b2704d9c1a (diff)
downloadspark-9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1.tar.gz
spark-9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1.tar.bz2
spark-9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1.zip
[SPARK-6602] [CORE] Remove some places in core that calling SparkEnv.actorSystem
Author: zsxwing <zsxwing@gmail.com> Closes #6333 from zsxwing/remove-actor-system-usage and squashes the following commits: f125aa6 [zsxwing] Fix YarnAllocatorSuite ceadcf6 [zsxwing] Change the "port" parameter type of "AkkaUtils.address" to "int"; update ApplicationMaster and YarnAllocator to get the driverUrl from RpcEnv 3239380 [zsxwing] Remove some places in core that calling SparkEnv.actorSystem
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala12
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala3
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala1
4 files changed, 17 insertions, 17 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index af4927b0e4..760e458972 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -34,7 +34,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
import org.apache.spark.SparkException
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util._
@@ -220,7 +220,7 @@ private[spark] class ApplicationMaster(
sparkContextRef.compareAndSet(sc, null)
}
- private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
+ private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: SecurityManager) = {
val sc = sparkContextRef.get()
val appId = client.getAttemptId().getApplicationId().toString()
@@ -231,8 +231,14 @@ private[spark] class ApplicationMaster(
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
- allocator = client.register(yarnConf,
- if (sc != null) sc.getConf else sparkConf,
+ val _sparkConf = if (sc != null) sc.getConf else sparkConf
+ val driverUrl = _rpcEnv.uriOf(
+ SparkEnv.driverActorSystemName,
+ RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt),
+ CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
+ allocator = client.register(driverUrl,
+ yarnConf,
+ _sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
historyAddress,
@@ -279,7 +285,7 @@ private[spark] class ApplicationMaster(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
- registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
+ registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
}
@@ -289,7 +295,7 @@ private[spark] class ApplicationMaster(
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)
waitForSparkDriver()
addAmIpFilter()
- registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
+ registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8a08f561a2..21193e7c62 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -34,10 +34,8 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
-import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.AkkaUtils
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -53,6 +51,7 @@ import org.apache.spark.util.AkkaUtils
* synchronized.
*/
private[yarn] class YarnAllocator(
+ driverUrl: String,
conf: Configuration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
@@ -107,13 +106,6 @@ private[yarn] class YarnAllocator(
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
launcherPool.allowCoreThreadTimeOut(true)
- private val driverUrl = AkkaUtils.address(
- AkkaUtils.protocol(securityMgr.akkaSSLOptions.enabled),
- SparkEnv.driverActorSystemName,
- sparkConf.get("spark.driver.host"),
- sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
-
// For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index ffe71dfd7d..7f533ee55e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -55,6 +55,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
* @param uiHistoryAddress Address of the application on the History Server.
*/
def register(
+ driverUrl: String,
conf: YarnConfiguration,
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
@@ -72,7 +73,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
- new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
+ new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
}
/**
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 455f1019d8..b343cbb0c7 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -90,6 +90,7 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
"--jar", "somejar.jar",
"--class", "SomeClass")
new YarnAllocator(
+ "not used",
conf,
sparkConf,
rmClient,