diff options
author | witgo <witgo@qq.com> | 2014-07-15 13:52:56 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-07-15 13:52:56 -0500 |
commit | 72ea56da8e383c61c6f18eeefef03b9af00f5158 (patch) | |
tree | 2bc7f4c045366d398b9fbdf38c00f0869514d675 /yarn | |
parent | 9dd635eb5df52835b3b7f4f2b9c789da9e813c71 (diff) | |
download | spark-72ea56da8e383c61c6f18eeefef03b9af00f5158.tar.gz spark-72ea56da8e383c61c6f18eeefef03b9af00f5158.tar.bz2 spark-72ea56da8e383c61c6f18eeefef03b9af00f5158.zip |
SPARK-1291: Link the spark UI to RM ui in yarn-client mode
Author: witgo <witgo@qq.com>
Closes #1112 from witgo/SPARK-1291 and squashes the following commits:
6022bcd [witgo] review commit
1fbb925 [witgo] add addAmIpFilter to yarn alpha
210299c [witgo] review commit
1b92a07 [witgo] review commit
6896586 [witgo] Add comments to addWebUIFilter
3e9630b [witgo] review commit
142ee29 [witgo] review commit
1fe7710 [witgo] Link the spark UI to RM ui in yarn-client mode
Diffstat (limited to 'yarn')
3 files changed, 40 insertions, 6 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index bfdb6232f5..a86ad256df 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -32,6 +32,7 @@ import akka.actor.Terminated import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter import org.apache.spark.scheduler.SplitInfo import org.apache.spark.deploy.SparkHadoopUtil @@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp case x: DisassociatedEvent => logInfo(s"Driver terminated or disconnected! Shutting down. $x") driverClosed = true + case x: AddWebUIFilter => + logInfo(s"Add WebUI Filter. $x") + driver ! x } } @@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } waitForSparkMaster() - + addAmIpFilter() // Allocate all containers allocateExecutors() @@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { - logInfo("Registering the ApplicationMaster") + val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "") + logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress") val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest]) .asInstanceOf[RegisterApplicationMasterRequest] appMasterRequest.setApplicationAttemptId(appAttemptId) @@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appMasterRequest.setHost(Utils.localHostName()) appMasterRequest.setRpcPort(0) // What do we provide here ? Might make sense to expose something sensible later ? - appMasterRequest.setTrackingUrl("") + appMasterRequest.setTrackingUrl(appUIAddress) resourceManager.registerApplicationMaster(appMasterRequest) } + // add the yarn amIpFilter that Yarn requires for properly securing the UI + private def addAmIpFilter() { + val proxy = YarnConfiguration.getProxyHostAndPort(conf) + val parts = proxy.split(":") + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val uriBase = "http://" + proxy + proxyBase + val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) + } + private def waitForSparkMaster() { logInfo("Waiting for spark driver to be reachable.") var driverUp = false diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 0f9fdcfcb6..1b37c4bb13 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -48,6 +48,7 @@ private[spark] class YarnClientSchedulerBackend( val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort + conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index f71ad036ce..5ac95f3798 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -31,10 +31,12 @@ import akka.actor.Terminated import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.hadoop.yarn.webapp.util.WebAppUtils /** * An application master that allocates executors on behalf of a driver that is running outside @@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp case x: DisassociatedEvent => logInfo(s"Driver terminated or disconnected! Shutting down. $x") driverClosed = true + case x: AddWebUIFilter => + logInfo(s"Add WebUI Filter. $x") + driver ! x } } @@ -99,6 +104,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp registerApplicationMaster() waitForSparkMaster() + addAmIpFilter() // Allocate all containers allocateExecutors() @@ -142,9 +148,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { - logInfo("Registering the ApplicationMaster") - // TODO: Find out client's Spark UI address and fill in here? - amClient.registerApplicationMaster(Utils.localHostName(), 0, "") + val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "") + logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress") + amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress) + } + + // add the yarn amIpFilter that Yarn requires for properly securing the UI + private def addAmIpFilter() { + val proxy = WebAppUtils.getProxyHostAndPort(conf) + val parts = proxy.split(":") + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val uriBase = "http://" + proxy + proxyBase + val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) } private def waitForSparkMaster() { |