aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorlianhuiwang <lianhuiwang09@gmail.com>2015-01-28 12:50:57 -0800
committerAndrew Or <andrew@databricks.com>2015-01-28 12:51:15 -0800
commit81f8f3406284c391dfad14fb70147fa8e20692a8 (patch)
tree0a9b18c69a32d4b4c5b7bf5c31c0f2dfa56bbda9 /yarn
parent456c11f15aec809044d8bdbdcce0ae05533fb44b (diff)
downloadspark-81f8f3406284c391dfad14fb70147fa8e20692a8.tar.gz
spark-81f8f3406284c391dfad14fb70147fa8e20692a8.tar.bz2
spark-81f8f3406284c391dfad14fb70147fa8e20692a8.zip
[SPARK-4955]With executor dynamic scaling enabled,executor shoude be added or killed in yarn-cluster mode.
With executor dynamic scaling enabled, executor number shoude be added or killed in yarn-cluster mode.so in yarn-cluster mode, ApplicationMaster start a AMActor that add or kill a executor. then YarnSchedulerActor in YarnSchedulerBackend send message to am's AMActor. andrewor14 ChengXiangLi tdas Author: lianhuiwang <lianhuiwang09@gmail.com> Closes #3962 from lianhuiwang/SPARK-4955 and squashes the following commits: 48d9ebb [lianhuiwang] update with andrewor14's comments 12426af [lianhuiwang] refactor am's code 45da3b0 [lianhuiwang] remove unrelated code 9318fc1 [lianhuiwang] update with andrewor14's comments 08ba473 [lianhuiwang] address andrewor14's comments 265c36d [lianhuiwang] fix small change f43bda8 [lianhuiwang] fix address andrewor14's comments 7a7767a [lianhuiwang] fix address andrewor14's comments bbc4d5a [lianhuiwang] address andrewor14's comments 1b029a4 [lianhuiwang] fix bug 7d33791 [lianhuiwang] in AM create a new actorSystem 2164ea8 [lianhuiwang] fix a min bug 6dfeeec [lianhuiwang] in yarn-cluster mode,executor number can be added or killed.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala57
1 files changed, 43 insertions, 14 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 902bdda598..d3e327b249 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -43,8 +43,11 @@ import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
/**
* Common application master functionality for Spark on Yarn.
*/
-private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
- client: YarnRMClient) extends Logging {
+private[spark] class ApplicationMaster(
+ args: ApplicationMasterArguments,
+ client: YarnRMClient)
+ extends Logging {
+
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
@@ -231,6 +234,24 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
reporterThread = launchReporterThread()
}
+ /**
+ * Create an actor that communicates with the driver.
+ *
+ * In cluster mode, the AM and the driver belong to same process
+ * so the AM actor need not monitor lifecycle of the driver.
+ */
+ private def runAMActor(
+ host: String,
+ port: String,
+ isDriver: Boolean): Unit = {
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ host,
+ port,
+ YarnSchedulerBackend.ACTOR_NAME)
+ actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name = "YarnAM")
+ }
+
private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
userClassThread = startUserClass()
@@ -245,6 +266,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} else {
+ actorSystem = sc.env.actorSystem
+ runAMActor(
+ sc.getConf.get("spark.driver.host"),
+ sc.getConf.get("spark.driver.port"),
+ isDriver = true)
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
@@ -253,7 +279,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityMgr)._1
- actor = waitForSparkDriver()
+ waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
@@ -367,7 +393,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
}
}
- private def waitForSparkDriver(): ActorRef = {
+ private def waitForSparkDriver(): Unit = {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
val hostport = args.userArgs(0)
@@ -399,12 +425,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- driverHost,
- driverPort.toString,
- YarnSchedulerBackend.ACTOR_NAME)
- actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
+ runAMActor(driverHost, driverPort.toString, isDriver = false)
}
/** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -462,9 +483,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
}
/**
- * Actor that communicates with the driver in client deploy mode.
+ * An actor that communicates with the driver's scheduler backend.
*/
- private class AMActor(driverUrl: String) extends Actor {
+ private class AMActor(driverUrl: String, isDriver: Boolean) extends Actor {
var driver: ActorSelection = _
override def preStart() = {
@@ -474,13 +495,21 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// we can monitor Lifecycle Events.
driver ! "Hello"
driver ! RegisterClusterManager
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ // In cluster mode, the AM can directly monitor the driver status instead
+ // of trying to deduce it from the lifecycle of the driver's actor
+ if (!isDriver) {
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ }
}
override def receive = {
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
- finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+ // In cluster mode, do not rely on the disassociated event to exit
+ // This avoids potentially reporting incorrect exit codes if the driver fails
+ if (!isDriver) {
+ finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+ }
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")