aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-04 11:52:05 -0700
committerReynold Xin <rxin@databricks.com>2015-04-04 11:52:05 -0700
commitf15806a8f8ca34288ddb2d74b9ff1972c8374b59 (patch)
tree88abe5de9fadf078e57951450cb3368d0fb7cb64 /yarn
parent7bca62f79056e592cf07b49d8b8d04c59dea25fc (diff)
downloadspark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.tar.gz
spark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.tar.bz2
spark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.zip
[SPARK-6602][Core] Replace direct use of Akka with Spark RPC interface - part 1
This PR replaced the following `Actor`s to `RpcEndpoint`: 1. HeartbeatReceiver 1. ExecutorActor 1. BlockManagerMasterActor 1. BlockManagerSlaveActor 1. CoarseGrainedExecutorBackend and subclasses 1. CoarseGrainedSchedulerBackend.DriverActor This is the first PR. I will split the work of SPARK-6602 to several PRs for code review. Author: zsxwing <zsxwing@gmail.com> Closes #5268 from zsxwing/rpc-rewrite and squashes the following commits: 287e9f8 [zsxwing] Fix the code style 26c56b7 [zsxwing] Merge branch 'master' into rpc-rewrite 9cc825a [zsxwing] Rmove setupThreadSafeEndpoint and add ThreadSafeRpcEndpoint 30a9036 [zsxwing] Make self return null after stopping RpcEndpointRef; fix docs and error messages 705245d [zsxwing] Fix some bugs after rebasing the changes on the master 003cf80 [zsxwing] Update CoarseGrainedExecutorBackend and CoarseGrainedSchedulerBackend to use RpcEndpoint 7d0e6dc [zsxwing] Update BlockManagerSlaveActor to use RpcEndpoint f5d6543 [zsxwing] Update BlockManagerMaster to use RpcEndpoint 30e3f9f [zsxwing] Update ExecutorActor to use RpcEndpoint 478b443 [zsxwing] Update HeartbeatReceiver to use RpcEndpoint
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala86
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala2
2 files changed, 39 insertions, 49 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 455554eea0..24a1e02795 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -24,22 +24,20 @@ import java.lang.reflect.InvocationTargetException
import java.net.{Socket, URL}
import java.util.concurrent.atomic.AtomicReference
-import akka.actor._
-import akka.remote._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader,
- SignalLogger, Utils}
+import org.apache.spark.util._
/**
* Common application master functionality for Spark on Yarn.
@@ -72,8 +70,8 @@ private[spark] class ApplicationMaster(
@volatile private var allocator: YarnAllocator = _
// Fields used in client mode.
- private var actorSystem: ActorSystem = null
- private var actor: ActorRef = _
+ private var rpcEnv: RpcEnv = null
+ private var amEndpoint: RpcEndpointRef = _
// Fields used in cluster mode.
private val sparkContextRef = new AtomicReference[SparkContext](null)
@@ -240,22 +238,21 @@ private[spark] class ApplicationMaster(
}
/**
- * Create an actor that communicates with the driver.
+ * Create an [[RpcEndpoint]] that communicates with the driver.
*
* In cluster mode, the AM and the driver belong to same process
- * so the AM actor need not monitor lifecycle of the driver.
+ * so the AMEndpoint need not monitor lifecycle of the driver.
*/
- private def runAMActor(
+ private def runAMEndpoint(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
- val driverUrl = AkkaUtils.address(
- AkkaUtils.protocol(actorSystem),
+ val driverEndpont = rpcEnv.setupEndpointRef(
SparkEnv.driverActorSystemName,
- host,
- port,
- YarnSchedulerBackend.ACTOR_NAME)
- actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isClusterMode)), name = "YarnAM")
+ RpcAddress(host, port.toInt),
+ YarnSchedulerBackend.ENDPOINT_NAME)
+ amEndpoint =
+ rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpont, isClusterMode))
}
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -272,8 +269,8 @@ private[spark] class ApplicationMaster(
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} else {
- actorSystem = sc.env.actorSystem
- runAMActor(
+ rpcEnv = sc.env.rpcEnv
+ runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
@@ -283,8 +280,7 @@ private[spark] class ApplicationMaster(
}
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
- actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf, securityManager = securityMgr)._1
+ rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, 0, sparkConf, securityMgr)
waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
@@ -431,7 +427,7 @@ private[spark] class ApplicationMaster(
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
- runAMActor(driverHost, driverPort.toString, isClusterMode = false)
+ runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false)
}
/** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -443,7 +439,7 @@ private[spark] class ApplicationMaster(
System.setProperty("spark.ui.filters", amFilter)
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
} else {
- actor ! AddWebUIFilter(amFilter, params.toMap, proxyBase)
+ amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
}
}
@@ -505,44 +501,29 @@ private[spark] class ApplicationMaster(
}
/**
- * An actor that communicates with the driver's scheduler backend.
+ * An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
*/
- private class AMActor(driverUrl: String, isClusterMode: Boolean) extends Actor {
- var driver: ActorSelection = _
-
- override def preStart(): Unit = {
- logInfo("Listen to driver: " + driverUrl)
- driver = context.actorSelection(driverUrl)
- // Send a hello message to establish the connection, after which
- // we can monitor Lifecycle Events.
- driver ! "Hello"
- driver ! RegisterClusterManager
- // In cluster mode, the AM can directly monitor the driver status instead
- // of trying to deduce it from the lifecycle of the driver's actor
- if (!isClusterMode) {
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- }
+ private class AMEndpoint(
+ override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean)
+ extends RpcEndpoint with Logging {
+
+ override def onStart(): Unit = {
+ driver.send(RegisterClusterManager(self))
}
override def receive: PartialFunction[Any, Unit] = {
- case x: DisassociatedEvent =>
- logInfo(s"Driver terminated or disconnected! Shutting down. $x")
- // In cluster mode, do not rely on the disassociated event to exit
- // This avoids potentially reporting incorrect exit codes if the driver fails
- if (!isClusterMode) {
- finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
- }
-
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
- driver ! x
+ driver.send(x)
+ }
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestExecutors(requestedTotal) =>
Option(allocator) match {
case Some(a) => a.requestTotalExecutors(requestedTotal)
case None => logWarning("Container allocator is not ready to request executors yet.")
}
- sender ! true
+ context.reply(true)
case KillExecutors(executorIds) =>
logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
@@ -550,7 +531,16 @@ private[spark] class ApplicationMaster(
case Some(a) => executorIds.foreach(a.killExecutor)
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
- sender ! true
+ context.reply(true)
+ }
+
+ override def onDisconnected(remoteAddress: RpcAddress): Unit = {
+ logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
+ // In cluster mode, do not rely on the disassociated event to exit
+ // This avoids potentially reporting incorrect exit codes if the driver fails
+ if (!isClusterMode) {
+ finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+ }
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index c98763e15b..b8f42dadcb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -112,7 +112,7 @@ private[yarn] class YarnAllocator(
SparkEnv.driverActorSystemName,
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
// For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)