aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorlianhuiwang <lianhuiwang09@gmail.com>2015-02-06 10:48:31 -0800
committerAndrew Or <andrew@databricks.com>2015-02-06 10:48:31 -0800
commitcc6e53119d7a51b95b19244f50b25814088b4d11 (patch)
treebe8f5505e03fa5bf9808369a00bab065c0d131c9 /yarn
parent9ad56ad2a2a51df449040c4f4b7c66b104883312 (diff)
downloadspark-cc6e53119d7a51b95b19244f50b25814088b4d11.tar.gz
spark-cc6e53119d7a51b95b19244f50b25814088b4d11.tar.bz2
spark-cc6e53119d7a51b95b19244f50b25814088b4d11.zip
[SPARK-5653][YARN] In ApplicationMaster rename isDriver to isClusterMode
in ApplicationMaster rename isDriver to isClusterMode,because in Client it uses isClusterMode,ApplicationMaster should keep consistent with it and uses isClusterMode.Also isClusterMode is easier to understand. andrewor14 sryza Author: lianhuiwang <lianhuiwang09@gmail.com> Closes #4430 from lianhuiwang/am-isDriver-rename and squashes the following commits: f9f3ed0 [lianhuiwang] rename isDriver to isClusterMode
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala24
1 files changed, 12 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 37e98e01fd..4cc320c5d5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -54,7 +54,7 @@ private[spark] class ApplicationMaster(
private val sparkConf = new SparkConf()
private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
.asInstanceOf[YarnConfiguration]
- private val isDriver = args.userClass != null
+ private val isClusterMode = args.userClass != null
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
@@ -81,7 +81,7 @@ private[spark] class ApplicationMaster(
try {
val appAttemptId = client.getAttemptId()
- if (isDriver) {
+ if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
@@ -139,7 +139,7 @@ private[spark] class ApplicationMaster(
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
- if (isDriver) {
+ if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
@@ -162,7 +162,7 @@ private[spark] class ApplicationMaster(
* from the application code.
*/
final def getDefaultFinalStatus() = {
- if (isDriver) {
+ if (isClusterMode) {
FinalApplicationStatus.SUCCEEDED
} else {
FinalApplicationStatus.UNDEFINED
@@ -243,7 +243,7 @@ private[spark] class ApplicationMaster(
private def runAMActor(
host: String,
port: String,
- isDriver: Boolean): Unit = {
+ isClusterMode: Boolean): Unit = {
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
@@ -251,7 +251,7 @@ private[spark] class ApplicationMaster(
host,
port,
YarnSchedulerBackend.ACTOR_NAME)
- actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name = "YarnAM")
+ actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isClusterMode)), name = "YarnAM")
}
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -272,7 +272,7 @@ private[spark] class ApplicationMaster(
runAMActor(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
- isDriver = true)
+ isClusterMode = true)
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
@@ -427,7 +427,7 @@ private[spark] class ApplicationMaster(
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
- runAMActor(driverHost, driverPort.toString, isDriver = false)
+ runAMActor(driverHost, driverPort.toString, isClusterMode = false)
}
/** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -435,7 +435,7 @@ private[spark] class ApplicationMaster(
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
- if (isDriver) {
+ if (isClusterMode) {
System.setProperty("spark.ui.filters", amFilter)
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
} else {
@@ -491,7 +491,7 @@ private[spark] class ApplicationMaster(
/**
* An actor that communicates with the driver's scheduler backend.
*/
- private class AMActor(driverUrl: String, isDriver: Boolean) extends Actor {
+ private class AMActor(driverUrl: String, isClusterMode: Boolean) extends Actor {
var driver: ActorSelection = _
override def preStart() = {
@@ -503,7 +503,7 @@ private[spark] class ApplicationMaster(
driver ! RegisterClusterManager
// In cluster mode, the AM can directly monitor the driver status instead
// of trying to deduce it from the lifecycle of the driver's actor
- if (!isDriver) {
+ if (!isClusterMode) {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
}
@@ -513,7 +513,7 @@ private[spark] class ApplicationMaster(
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
- if (!isDriver) {
+ if (!isClusterMode) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}