aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala8
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala12
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala28
4 files changed, 39 insertions, 13 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index acf26505e4..9bd1719cb1 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -76,8 +76,12 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
resourceManager.finishApplicationMaster(finishReq)
}
- override def getProxyHostAndPort(conf: YarnConfiguration) =
- YarnConfiguration.getProxyHostAndPort(conf)
+ override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
+ val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val uriBase = "http://" + proxy + proxyBase
+ Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+ }
override def getMaxRegAttempts(conf: YarnConfiguration) =
conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index b51daeb437..caceef5d4b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -368,18 +368,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter() = {
- val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- val proxy = client.getProxyHostAndPort(yarnConf)
- val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
- val uriBase = "http://" + proxy + proxyBase
- val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ val params = client.getAmIpFilterParams(yarnConf, proxyBase)
if (isDriver) {
System.setProperty("spark.ui.filters", amFilter)
- System.setProperty(s"spark.$amFilter.params", params)
+ params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
} else {
- actor ! AddWebUIFilter(amFilter, params, proxyBase)
+ actor ! AddWebUIFilter(amFilter, params.toMap, proxyBase)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index ed65e56b3e..943dc56202 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -59,8 +59,8 @@ trait YarnRMClient {
/** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId
- /** Returns the RM's proxy host and port. */
- def getProxyHostAndPort(conf: YarnConfiguration): String
+ /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
+ def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String]
/** Returns the maximum number of attempts to register the AM. */
def getMaxRegAttempts(conf: YarnConfiguration): Int
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index 54bc6b14c4..b581790e15 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -17,8 +17,13 @@
package org.apache.spark.deploy.yarn
+import java.util.{List => JList}
+
import scala.collection.{Map, Set}
+import scala.collection.JavaConversions._
+import scala.util._
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
@@ -69,7 +74,28 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
appAttemptId
}
- override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf)
+ override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
+ // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+ // so not all stable releases have it.
+ val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+ .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+ // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+ try {
+ val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+ classOf[Configuration])
+ val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+ val hosts = proxies.map { proxy => proxy.split(":")(0) }
+ val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+ Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+ } catch {
+ case e: NoSuchMethodException =>
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val uriBase = prefix + proxy + proxyBase
+ Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+ }
+ }
override def getMaxRegAttempts(conf: YarnConfiguration) =
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)