aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-10-03 13:18:35 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-03 13:18:35 -0700
commit30abef154768e5c4c6062f3341933dbda990f6cc (patch)
tree2f8f515b738069b8baa4ada773533e4abf595fe8 /yarn
parente5566e05b1ac99aa6caf1701e47ebcdb68a002c6 (diff)
downloadspark-30abef154768e5c4c6062f3341933dbda990f6cc.tar.gz
spark-30abef154768e5c4c6062f3341933dbda990f6cc.tar.bz2
spark-30abef154768e5c4c6062f3341933dbda990f6cc.zip
[SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA.
The existing code only considered one of the RMs when running in Yarn HA mode, so it was possible to get errors if the active RM was not registered in the filter. The change makes use of a new API added to Yarn that returns all proxy addresses, and falls back to the old behavior if the API is not present. While there, I also made a change to look for the scheme (http or https) being used by Yarn when building the proxy URIs. Since, in the case of multiple RMs, Yarn uses commas as a separator, it was not possible anymore to use spark.filter.params to propagate this information (which used commas to delimit different config params). Instead, I added a new param (spark.filter.jsonParams) which expects a JSON string containing a map with the config data. I chose not to add it to the documentation at this point since I don't believe users will use it directly. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #2469 from vanzin/SPARK-3606 and squashes the following commits: aeb458a [Marcelo Vanzin] Undelete needed import. 65e400d [Marcelo Vanzin] Remove unused import. d121883 [Marcelo Vanzin] Use separate config for each param instead of json. 04bc156 [Marcelo Vanzin] Review feedback. 4d4d6b9 [Marcelo Vanzin] [SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala8
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala12
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala4
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala28
4 files changed, 39 insertions, 13 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index acf26505e4..9bd1719cb1 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -76,8 +76,12 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
resourceManager.finishApplicationMaster(finishReq)
}
- override def getProxyHostAndPort(conf: YarnConfiguration) =
- YarnConfiguration.getProxyHostAndPort(conf)
+ override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
+ val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val uriBase = "http://" + proxy + proxyBase
+ Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+ }
override def getMaxRegAttempts(conf: YarnConfiguration) =
conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index b51daeb437..caceef5d4b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -368,18 +368,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter() = {
- val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- val proxy = client.getProxyHostAndPort(yarnConf)
- val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
- val uriBase = "http://" + proxy + proxyBase
- val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ val params = client.getAmIpFilterParams(yarnConf, proxyBase)
if (isDriver) {
System.setProperty("spark.ui.filters", amFilter)
- System.setProperty(s"spark.$amFilter.params", params)
+ params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
} else {
- actor ! AddWebUIFilter(amFilter, params, proxyBase)
+ actor ! AddWebUIFilter(amFilter, params.toMap, proxyBase)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index ed65e56b3e..943dc56202 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -59,8 +59,8 @@ trait YarnRMClient {
/** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId
- /** Returns the RM's proxy host and port. */
- def getProxyHostAndPort(conf: YarnConfiguration): String
+ /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
+ def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String]
/** Returns the maximum number of attempts to register the AM. */
def getMaxRegAttempts(conf: YarnConfiguration): Int
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index 54bc6b14c4..b581790e15 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -17,8 +17,13 @@
package org.apache.spark.deploy.yarn
+import java.util.{List => JList}
+
import scala.collection.{Map, Set}
+import scala.collection.JavaConversions._
+import scala.util._
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
@@ -69,7 +74,28 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
appAttemptId
}
- override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf)
+ override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
+ // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+ // so not all stable releases have it.
+ val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+ .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+ // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+ try {
+ val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+ classOf[Configuration])
+ val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+ val hosts = proxies.map { proxy => proxy.split(":")(0) }
+ val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+ Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+ } catch {
+ case e: NoSuchMethodException =>
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val uriBase = prefix + proxy + proxyBase
+ Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+ }
+ }
override def getMaxRegAttempts(conf: YarnConfiguration) =
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)