diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/HttpServer.scala | 21 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 58 |
2 files changed, 38 insertions, 41 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index d14693cb78..cb5df25fa4 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -21,9 +21,10 @@ import java.io.File import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator -import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService} +import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler} -import org.eclipse.jetty.server.{Server, ServerConnector} +import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.bio.SocketConnector import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler} import org.eclipse.jetty.util.thread.QueuedThreadPool @@ -42,7 +43,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes */ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager) extends Logging { - private var server: Server = _ + private var server: Server = null private var port: Int = -1 def start() { @@ -50,16 +51,16 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan throw new ServerStateException("Server is already started") } else { logInfo("Starting HTTP Server") - val threadPool = new QueuedThreadPool - threadPool.setDaemon(true) - - server = new Server(threadPool) - val connector = new ServerConnector(server) - connector.setIdleTimeout(60 * 1000) + server = new Server() + val connector = new SocketConnector + connector.setMaxIdleTime(60*1000) connector.setSoLingerTime(-1) connector.setPort(0) server.addConnector(connector) + val threadPool = new QueuedThreadPool + threadPool.setDaemon(true) + server.setThreadPool(threadPool) val resHandler = new ResourceHandler resHandler.setResourceBase(resourceBase.getAbsolutePath) @@ -78,7 +79,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan } server.start() - port = connector.getLocalPort + port = server.getConnectors()(0).getLocalPort() } } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index cc73249396..7c35cd165a 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -19,7 +19,6 @@ package org.apache.spark.ui import java.net.InetSocketAddress import java.net.URL -import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletResponse, HttpServletRequest} import scala.annotation.tailrec @@ -29,7 +28,7 @@ import scala.xml.Node import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} -import org.eclipse.jetty.server.{NetworkConnector, Server} +import org.eclipse.jetty.server.{DispatcherType, Server} import org.eclipse.jetty.server.handler.HandlerList import org.eclipse.jetty.servlet.{DefaultServlet, FilterHolder, ServletContextHandler, ServletHolder} import org.eclipse.jetty.util.thread.QueuedThreadPool @@ -61,7 +60,8 @@ private[spark] object JettyUtils extends Logging { def createServlet[T <% AnyRef](servletParams: ServletParams[T], securityMgr: SecurityManager): HttpServlet = { new HttpServlet { - override def doGet(request: HttpServletRequest, response: HttpServletResponse) { + override def doGet(request: HttpServletRequest, + response: HttpServletResponse) { if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) { response.setContentType("%s;charset=utf-8".format(servletParams.contentType)) response.setStatus(HttpServletResponse.SC_OK) @@ -72,7 +72,7 @@ private[spark] object JettyUtils extends Logging { response.setStatus(HttpServletResponse.SC_UNAUTHORIZED) response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate") response.sendError(HttpServletResponse.SC_UNAUTHORIZED, - "User is not authorized to access this page.") + "User is not authorized to access this page."); } } } @@ -120,25 +120,26 @@ private[spark] object JettyUtils extends Logging { private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) { val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim()) - filters.foreach { filter => - if (!filter.isEmpty) { - logInfo("Adding filter: " + filter) - val holder : FilterHolder = new FilterHolder() - holder.setClassName(filter) - // get any parameters for each filter - val paramName = "spark." + filter + ".params" - val params = conf.get(paramName, "").split(',').map(_.trim()).toSet - params.foreach { - case param : String => - if (!param.isEmpty) { - val parts = param.split("=") - if (parts.length == 2) holder.setInitParameter(parts(0), parts(1)) - } + filters.foreach { + case filter : String => + if (!filter.isEmpty) { + logInfo("Adding filter: " + filter) + val holder : FilterHolder = new FilterHolder() + holder.setClassName(filter) + // get any parameters for each filter + val paramName = "spark." + filter + ".params" + val params = conf.get(paramName, "").split(',').map(_.trim()).toSet + params.foreach { + case param : String => + if (!param.isEmpty) { + val parts = param.split("=") + if (parts.length == 2) holder.setInitParameter(parts(0), parts(1)) + } + } + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR, + DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST) + handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) } } - val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR, - DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST) - handlers.foreach { handler => handler.addFilter(holder, "/*", enumDispatcher) } - } } } @@ -149,10 +150,7 @@ private[spark] object JettyUtils extends Logging { * If the desired port number is contented, continues incrementing ports until a free port is * found. Returns the chosen port and the jetty Server object. */ - def startJettyServer( - hostName: String, - port: Int, - handlers: Seq[ServletContextHandler], + def startJettyServer(hostName: String, port: Int, handlers: Seq[ServletContextHandler], conf: SparkConf): (Server, Int) = { addFilters(handlers, conf) @@ -162,18 +160,16 @@ private[spark] object JettyUtils extends Logging { @tailrec def connect(currentPort: Int): (Server, Int) = { val server = new Server(new InetSocketAddress(hostName, currentPort)) - // Unfortunately Jetty 9 doesn't allow us to set both the thread pool and the port number in - // constructor. But fortunately the pool allocated by Jetty is always a QueuedThreadPool. - val pool = server.getThreadPool.asInstanceOf[QueuedThreadPool] + val pool = new QueuedThreadPool pool.setDaemon(true) - + server.setThreadPool(pool) server.setHandler(handlerList) Try { server.start() } match { case s: Success[_] => - (server, server.getConnectors.head.asInstanceOf[NetworkConnector].getLocalPort) + (server, server.getConnectors.head.getLocalPort) case f: Failure[_] => server.stop() logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) |