diff options
Diffstat (limited to 'core/src')
3 files changed, 14 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 6abf6d930c..fb8160abc5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -66,7 +66,7 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage - case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String) + case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String) extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 89089e7d6f..59aed6b72f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -275,15 +275,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } // Add filters to the SparkUI - def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { + def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String) { if (proxyBase != null && proxyBase.nonEmpty) { System.setProperty("spark.ui.proxyBase", proxyBase) } - if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) { + val hasFilter = (filterName != null && filterName.nonEmpty && + filterParams != null && filterParams.nonEmpty) + if (hasFilter) { logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") conf.set("spark.ui.filters", filterName) - conf.set(s"spark.$filterName.params", filterParams) + filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) } scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 6b46892910..2a27d49d2d 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -21,9 +21,7 @@ import java.net.{InetSocketAddress, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import scala.annotation.tailrec import scala.language.implicitConversions -import scala.util.{Failure, Success, Try} import scala.xml.Node import org.eclipse.jetty.server.Server @@ -147,15 +145,19 @@ private[spark] object JettyUtils extends Logging { val holder : FilterHolder = new FilterHolder() holder.setClassName(filter) // Get any parameters for each filter - val paramName = "spark." + filter + ".params" - val params = conf.get(paramName, "").split(',').map(_.trim()).toSet - params.foreach { - case param : String => + conf.get("spark." + filter + ".params", "").split(',').map(_.trim()).toSet.foreach { + param: String => if (!param.isEmpty) { val parts = param.split("=") if (parts.length == 2) holder.setInitParameter(parts(0), parts(1)) } } + + val prefix = s"spark.$filter.param." + conf.getAll + .filter { case (k, v) => k.length() > prefix.length() && k.startsWith(prefix) } + .foreach { case (k, v) => holder.setInitParameter(k.substring(prefix.length()), v) } + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR, DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST) handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) } |