aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2017-02-22 14:37:53 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2017-02-22 14:37:53 -0800
commit4661d30b988bf773ab45a15b143efb2908d33743 (patch)
tree466912c594ddabc6093755ab080b8d42a2357769 /resource-managers
parent37112fcfcd64db8f84f437e5c54cc3ea039c68f6 (diff)
downloadspark-4661d30b988bf773ab45a15b143efb2908d33743.tar.gz
spark-4661d30b988bf773ab45a15b143efb2908d33743.tar.bz2
spark-4661d30b988bf773ab45a15b143efb2908d33743.zip
[SPARK-19554][UI,YARN] Allow SHS URL to be used for tracking in YARN RM.
Allow an application to use the History Server URL as the tracking URL in the YARN RM, so there's still a link to the web UI somewhere in YARN even if the driver's UI is disabled. This is useful, for example, if an admin wants to disable the driver UI by default for applications, since it's harder to secure it (since it involves non trivial ssl certificate and auth management that admins may not want to expose to user apps). This needs to be opt-in, because of the way the YARN proxy works, so a new configuration was added to enable the option. The YARN RM will proxy requests to live AMs instead of redirecting the client, so pages in the SHS UI will not render correctly since they'll reference invalid paths in the RM UI. The proxy base support in the SHS cannot be used since that would prevent direct access to the SHS. So, to solve this problem, for the feature to work end-to-end, a new YARN-specific filter was added that detects whether the requests come from the proxy and redirects the client appropriatly. The SHS admin has to add this filter manually if they want the feature to work. Tested with new unit test, and by running with the documented configuration set in a test cluster. Also verified the driver UI is used when it's enabled. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16946 from vanzin/SPARK-19554.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala7
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilter.scala81
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala8
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala7
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala55
5 files changed, 152 insertions, 6 deletions
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9df43aea3f..864c834d11 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -332,7 +332,7 @@ private[spark] class ApplicationMaster(
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
- uiAddress: String,
+ uiAddress: Option[String],
securityMgr: SecurityManager) = {
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
@@ -408,8 +408,7 @@ private[spark] class ApplicationMaster(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
- registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl).getOrElse(""),
- securityMgr)
+ registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
@@ -435,7 +434,7 @@ private[spark] class ApplicationMaster(
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
- registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
+ registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
securityMgr)
// In client mode the actor will stop the reporter thread.
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilter.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilter.scala
new file mode 100644
index 0000000000..ae625df753
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import javax.servlet._
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.spark.internal.Logging
+
+/**
+ * A filter to be used in the Spark History Server for redirecting YARN proxy requests to the
+ * main SHS address. This is useful for applications that are using the history server as the
+ * tracking URL, since the SHS-generated pages cannot be rendered in that case without extra
+ * configuration to set up a proxy base URI (meaning the SHS cannot be ever used directly).
+ */
+class YarnProxyRedirectFilter extends Filter with Logging {
+
+ import YarnProxyRedirectFilter._
+
+ override def destroy(): Unit = { }
+
+ override def init(config: FilterConfig): Unit = { }
+
+ override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain): Unit = {
+ val hreq = req.asInstanceOf[HttpServletRequest]
+
+ // The YARN proxy will send a request with the "proxy-user" cookie set to the YARN's client
+ // user name. We don't expect any other clients to set this cookie, since the SHS does not
+ // use cookies for anything.
+ Option(hreq.getCookies()).flatMap(_.find(_.getName() == COOKIE_NAME)) match {
+ case Some(_) =>
+ doRedirect(hreq, res.asInstanceOf[HttpServletResponse])
+
+ case _ =>
+ chain.doFilter(req, res)
+ }
+ }
+
+ private def doRedirect(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ val redirect = req.getRequestURL().toString()
+
+ // Need a client-side redirect instead of an HTTP one, otherwise the YARN proxy itself
+ // will handle the redirect and get into an infinite loop.
+ val content = s"""
+ |<html xmlns="http://www.w3.org/1999/xhtml">
+ |<head>
+ | <title>Spark History Server Redirect</title>
+ | <meta http-equiv="refresh" content="0;URL='$redirect'" />
+ |</head>
+ |<body>
+ | <p>The requested page can be found at: <a href="$redirect">$redirect</a>.</p>
+ |</body>
+ |</html>
+ """.stripMargin
+
+ logDebug(s"Redirecting YARN proxy request to $redirect.")
+ res.setStatus(HttpServletResponse.SC_OK)
+ res.setContentType("text/html")
+ res.getWriter().write(content)
+ }
+
+}
+
+private[spark] object YarnProxyRedirectFilter {
+ val COOKIE_NAME = "proxy-user"
+}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 163dfb5a60..53fb467f64 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -55,7 +55,7 @@ private[spark] class YarnRMClient extends Logging {
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
- uiAddress: String,
+ uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
@@ -65,9 +65,13 @@ private[spark] class YarnRMClient extends Logging {
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
+ val trackingUrl = uiAddress.getOrElse {
+ if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
+ }
+
logInfo("Registering the ApplicationMaster")
synchronized {
- amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+ amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
registered = true
}
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index f19a5b22a7..d8c96c35ca 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -82,6 +82,13 @@ package object config {
.stringConf
.createOptional
+ private[spark] val ALLOW_HISTORY_SERVER_TRACKING_URL =
+ ConfigBuilder("spark.yarn.historyServer.allowTracking")
+ .doc("Allow using the History Server URL for the application as the tracking URL for the " +
+ "application when the Web UI is not enabled.")
+ .booleanConf
+ .createWithDefault(false)
+
/* File distribution. */
private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala
new file mode 100644
index 0000000000..54dbe9d50a
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{PrintWriter, StringWriter}
+import javax.servlet.FilterChain
+import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse}
+
+import org.mockito.Mockito._
+
+import org.apache.spark.SparkFunSuite
+
+class YarnProxyRedirectFilterSuite extends SparkFunSuite {
+
+ test("redirect proxied requests, pass-through others") {
+ val requestURL = "http://example.com:1234/foo?"
+ val filter = new YarnProxyRedirectFilter()
+ val cookies = Array(new Cookie(YarnProxyRedirectFilter.COOKIE_NAME, "dr.who"))
+
+ val req = mock(classOf[HttpServletRequest])
+
+ // First request mocks a YARN proxy request (with the cookie set), second one has no cookies.
+ when(req.getCookies()).thenReturn(cookies, null)
+ when(req.getRequestURL()).thenReturn(new StringBuffer(requestURL))
+
+ val res = mock(classOf[HttpServletResponse])
+ when(res.getWriter()).thenReturn(new PrintWriter(new StringWriter()))
+
+ val chain = mock(classOf[FilterChain])
+
+ // First request is proxied.
+ filter.doFilter(req, res, chain)
+ verify(chain, never()).doFilter(req, res)
+
+ // Second request is not, so should invoke the filter chain.
+ filter.doFilter(req, res, chain)
+ verify(chain, times(1)).doFilter(req, res)
+ }
+
+}