aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala24
1 files changed, 12 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 37e98e01fd..4cc320c5d5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -54,7 +54,7 @@ private[spark] class ApplicationMaster(
private val sparkConf = new SparkConf()
private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
.asInstanceOf[YarnConfiguration]
- private val isDriver = args.userClass != null
+ private val isClusterMode = args.userClass != null
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
@@ -81,7 +81,7 @@ private[spark] class ApplicationMaster(
try {
val appAttemptId = client.getAttemptId()
- if (isDriver) {
+ if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
@@ -139,7 +139,7 @@ private[spark] class ApplicationMaster(
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
- if (isDriver) {
+ if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
@@ -162,7 +162,7 @@ private[spark] class ApplicationMaster(
* from the application code.
*/
final def getDefaultFinalStatus() = {
- if (isDriver) {
+ if (isClusterMode) {
FinalApplicationStatus.SUCCEEDED
} else {
FinalApplicationStatus.UNDEFINED
@@ -243,7 +243,7 @@ private[spark] class ApplicationMaster(
private def runAMActor(
host: String,
port: String,
- isDriver: Boolean): Unit = {
+ isClusterMode: Boolean): Unit = {
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
@@ -251,7 +251,7 @@ private[spark] class ApplicationMaster(
host,
port,
YarnSchedulerBackend.ACTOR_NAME)
- actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name = "YarnAM")
+ actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isClusterMode)), name = "YarnAM")
}
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -272,7 +272,7 @@ private[spark] class ApplicationMaster(
runAMActor(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
- isDriver = true)
+ isClusterMode = true)
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
@@ -427,7 +427,7 @@ private[spark] class ApplicationMaster(
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
- runAMActor(driverHost, driverPort.toString, isDriver = false)
+ runAMActor(driverHost, driverPort.toString, isClusterMode = false)
}
/** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -435,7 +435,7 @@ private[spark] class ApplicationMaster(
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
- if (isDriver) {
+ if (isClusterMode) {
System.setProperty("spark.ui.filters", amFilter)
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
} else {
@@ -491,7 +491,7 @@ private[spark] class ApplicationMaster(
/**
* An actor that communicates with the driver's scheduler backend.
*/
- private class AMActor(driverUrl: String, isDriver: Boolean) extends Actor {
+ private class AMActor(driverUrl: String, isClusterMode: Boolean) extends Actor {
var driver: ActorSelection = _
override def preStart() = {
@@ -503,7 +503,7 @@ private[spark] class ApplicationMaster(
driver ! RegisterClusterManager
// In cluster mode, the AM can directly monitor the driver status instead
// of trying to deduce it from the lifecycle of the driver's actor
- if (!isDriver) {
+ if (!isClusterMode) {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
}
@@ -513,7 +513,7 @@ private[spark] class ApplicationMaster(
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
- if (!isDriver) {
+ if (!isClusterMode) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}