diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2014-10-17 00:53:15 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-10-17 00:53:15 -0700 |
commit | 0d958f163014e2b612ec445c80dfe69ff29d9f1a (patch) | |
tree | 371ddc13fade15738811fd074f42e64eacd387df /yarn/stable | |
parent | 35875e9ec6e63a8f28a0bdc66f83d9b623bf02bb (diff) | |
download | spark-0d958f163014e2b612ec445c80dfe69ff29d9f1a.tar.gz spark-0d958f163014e2b612ec445c80dfe69ff29d9f1a.tar.bz2 spark-0d958f163014e2b612ec445c80dfe69ff29d9f1a.zip |
[SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 vers...
...ion).
This is a backport of SPARK-3606 to branch-1.1. Some of the code had to be
duplicated since branch-1.1 doesn't have the cleanup work that was done to
the Yarn codebase.
I don't know whether the version issue in yarn/alpha/pom.xml was intentional,
but I couldn't compile the code without fixing it.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #2497 from vanzin/SPARK-3606-1.1 and squashes the following commits:
4fd3c27 [Marcelo Vanzin] Remove unused imports.
75cde8c [Marcelo Vanzin] Scala is weird.
b27ebda [Marcelo Vanzin] Review feedback.
72ceafb [Marcelo Vanzin] Undelete needed import.
61162a6 [Marcelo Vanzin] Use separate config for each param instead of json.
3b7205f [Marcelo Vanzin] Review feedback.
b3b3e50 [Marcelo Vanzin] [SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 version).
Diffstat (limited to 'yarn/stable')
3 files changed, 58 insertions, 14 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e4d60c6396..378304f79c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil @@ -130,14 +129,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def addAmIpFilter() { val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" System.setProperty("spark.ui.filters", amFilter) - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts : Array[String] = proxy.split(":") - val uriBase = "http://" + proxy + - System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - - val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - System.setProperty( - "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val params = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase) + params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index e093fe4ae6..38e9f9c3fa 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -33,7 +33,6 @@ import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.hadoop.yarn.webapp.util.WebAppUtils /** * An application master that allocates executors on behalf of a driver that is running outside @@ -144,11 +143,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // add the yarn amIpFilter that Yarn requires for properly securing the UI private def addAmIpFilter() { - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - val uriBase = "http://" + proxy + proxyBase - val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + val amFilter = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase) val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala new file mode 100644 index 0000000000..ea81faf9a5 --- /dev/null +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ +import scala.util.Try + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.webapp.util.WebAppUtils + +private[yarn] object YarnStableUtils { + + def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = { + // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, + // so not all stable releases have it. + val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) + .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") + + // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. + try { + val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", + classOf[Configuration]) + val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] + val hosts = proxies.map { proxy => proxy.split(":")(0) } + val uriBases = proxies.map { proxy => prefix + proxy + proxyBase } + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) + } catch { + case e: NoSuchMethodException => + val proxy = WebAppUtils.getProxyHostAndPort(conf) + val parts = proxy.split(":") + val uriBase = prefix + proxy + proxyBase + Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) + } + } + +} |