aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-10-03 13:18:35 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-03 13:18:35 -0700
commit30abef154768e5c4c6062f3341933dbda990f6cc (patch)
tree2f8f515b738069b8baa4ada773533e4abf595fe8 /core
parente5566e05b1ac99aa6caf1701e47ebcdb68a002c6 (diff)
downloadspark-30abef154768e5c4c6062f3341933dbda990f6cc.tar.gz
spark-30abef154768e5c4c6062f3341933dbda990f6cc.tar.bz2
spark-30abef154768e5c4c6062f3341933dbda990f6cc.zip
[SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA.
The existing code only considered one of the RMs when running in Yarn HA mode, so it was possible to get errors if the active RM was not registered in the filter. The change makes use of a new API added to Yarn that returns all proxy addresses, and falls back to the old behavior if the API is not present. While there, I also made a change to look for the scheme (http or https) being used by Yarn when building the proxy URIs. Since, in the case of multiple RMs, Yarn uses commas as a separator, it was not possible anymore to use spark.filter.params to propagate this information (which used commas to delimit different config params). Instead, I added a new param (spark.filter.jsonParams) which expects a JSON string containing a map with the config data. I chose not to add it to the documentation at this point since I don't believe users will use it directly. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #2469 from vanzin/SPARK-3606 and squashes the following commits: aeb458a [Marcelo Vanzin] Undelete needed import. 65e400d [Marcelo Vanzin] Remove unused import. d121883 [Marcelo Vanzin] Use separate config for each param instead of json. 04bc156 [Marcelo Vanzin] Review feedback. 4d4d6b9 [Marcelo Vanzin] [SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala14
3 files changed, 14 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 6abf6d930c..fb8160abc5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,7 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
- case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
+ case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)
extends CoarseGrainedClusterMessage
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 89089e7d6f..59aed6b72f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -275,15 +275,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
// Add filters to the SparkUI
- def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
+ def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String) {
if (proxyBase != null && proxyBase.nonEmpty) {
System.setProperty("spark.ui.proxyBase", proxyBase)
}
- if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
+ val hasFilter = (filterName != null && filterName.nonEmpty &&
+ filterParams != null && filterParams.nonEmpty)
+ if (hasFilter) {
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
- conf.set(s"spark.$filterName.params", filterParams)
+ filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 6b46892910..2a27d49d2d 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -21,9 +21,7 @@ import java.net.{InetSocketAddress, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import scala.annotation.tailrec
import scala.language.implicitConversions
-import scala.util.{Failure, Success, Try}
import scala.xml.Node
import org.eclipse.jetty.server.Server
@@ -147,15 +145,19 @@ private[spark] object JettyUtils extends Logging {
val holder : FilterHolder = new FilterHolder()
holder.setClassName(filter)
// Get any parameters for each filter
- val paramName = "spark." + filter + ".params"
- val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
- params.foreach {
- case param : String =>
+ conf.get("spark." + filter + ".params", "").split(',').map(_.trim()).toSet.foreach {
+ param: String =>
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
}
}
+
+ val prefix = s"spark.$filter.param."
+ conf.getAll
+ .filter { case (k, v) => k.length() > prefix.length() && k.startsWith(prefix) }
+ .foreach { case (k, v) => holder.setInitParameter(k.substring(prefix.length()), v) }
+
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }