aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-10-17 00:53:15 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-17 00:53:15 -0700
commit0d958f163014e2b612ec445c80dfe69ff29d9f1a (patch)
tree371ddc13fade15738811fd074f42e64eacd387df /yarn/stable
parent35875e9ec6e63a8f28a0bdc66f83d9b623bf02bb (diff)
downloadspark-0d958f163014e2b612ec445c80dfe69ff29d9f1a.tar.gz
spark-0d958f163014e2b612ec445c80dfe69ff29d9f1a.tar.bz2
spark-0d958f163014e2b612ec445c80dfe69ff29d9f1a.zip
[SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 vers...
...ion). This is a backport of SPARK-3606 to branch-1.1. Some of the code had to be duplicated since branch-1.1 doesn't have the cleanup work that was done to the Yarn codebase. I don't know whether the version issue in yarn/alpha/pom.xml was intentional, but I couldn't compile the code without fixing it. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #2497 from vanzin/SPARK-3606-1.1 and squashes the following commits: 4fd3c27 [Marcelo Vanzin] Remove unused imports. 75cde8c [Marcelo Vanzin] Scala is weird. b27ebda [Marcelo Vanzin] Review feedback. 72ceafb [Marcelo Vanzin] Undelete needed import. 61162a6 [Marcelo Vanzin] Use separate config for each param instead of json. 3b7205f [Marcelo Vanzin] Review feedback. b3b3e50 [Marcelo Vanzin] [SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 version).
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala12
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala6
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala54
3 files changed, 58 insertions, 14 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e4d60c6396..378304f79c 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -130,14 +129,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def addAmIpFilter() {
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
System.setProperty("spark.ui.filters", amFilter)
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts : Array[String] = proxy.split(":")
- val uriBase = "http://" + proxy +
- System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
- val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- System.setProperty(
- "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
+ val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+ val params = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase)
+ params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index e093fe4ae6..38e9f9c3fa 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -33,7 +33,6 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
/**
* An application master that allocates executors on behalf of a driver that is running outside
@@ -144,11 +143,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// add the yarn amIpFilter that Yarn requires for properly securing the UI
private def addAmIpFilter() {
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
- val uriBase = "http://" + proxy + proxyBase
- val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ val amFilter = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase)
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala
new file mode 100644
index 0000000000..ea81faf9a5
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+private[yarn] object YarnStableUtils {
+
+ def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = {
+ // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+ // so not all stable releases have it.
+ val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+ .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+ // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+ try {
+ val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+ classOf[Configuration])
+ val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+ val hosts = proxies.map { proxy => proxy.split(":")(0) }
+ val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+ Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+ } catch {
+ case e: NoSuchMethodException =>
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val uriBase = prefix + proxy + proxyBase
+ Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+ }
+ }
+
+}