aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala22
1 files changed, 19 insertions, 3 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index bfdb6232f5..a86ad256df 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -32,6 +32,7 @@ import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.deploy.SparkHadoopUtil
@@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
+ case x: AddWebUIFilter =>
+ logInfo(s"Add WebUI Filter. $x")
+ driver ! x
}
}
@@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
waitForSparkMaster()
-
+ addAmIpFilter()
// Allocate all containers
allocateExecutors()
@@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
+ val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
+ logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
@@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
+ appMasterRequest.setTrackingUrl(appUIAddress)
resourceManager.registerApplicationMaster(appMasterRequest)
}
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+ val uriBase = "http://" + proxy + proxyBase
+ val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
+ }
+
private def waitForSparkMaster() {
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false