aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-03-13 12:16:04 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-13 12:16:04 -0700
commitca4bf8c572c2f70b484830f1db414b5073744ab6 (patch)
tree35151d6f99283b7286f574b06e4f650cf556014a /core
parent698373211ef3cdf841c82d48168cd5dbe00a57b4 (diff)
downloadspark-ca4bf8c572c2f70b484830f1db414b5073744ab6.tar.gz
spark-ca4bf8c572c2f70b484830f1db414b5073744ab6.tar.bz2
spark-ca4bf8c572c2f70b484830f1db414b5073744ab6.zip
SPARK-1236 - Upgrade Jetty to 9.1.3.v20140225.
Author: Reynold Xin <rxin@apache.org> Closes #113 from rxin/jetty9 and squashes the following commits: 867a2ce [Reynold Xin] Updated Jetty version to 9.1.3.v20140225 in Maven build file. d7c97ca [Reynold Xin] Return the correctly bound port. d14706f [Reynold Xin] Upgrade Jetty to 9.1.3.v20140225.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala58
2 files changed, 41 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index cb5df25fa4..d14693cb78 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -21,10 +21,9 @@ import java.io.File
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
-import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
+import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
-import org.eclipse.jetty.server.Server
-import org.eclipse.jetty.server.bio.SocketConnector
+import org.eclipse.jetty.server.{Server, ServerConnector}
import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler}
import org.eclipse.jetty.util.thread.QueuedThreadPool
@@ -43,7 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
*/
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
extends Logging {
- private var server: Server = null
+ private var server: Server = _
private var port: Int = -1
def start() {
@@ -51,16 +50,16 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
- server = new Server()
- val connector = new SocketConnector
- connector.setMaxIdleTime(60*1000)
+ val threadPool = new QueuedThreadPool
+ threadPool.setDaemon(true)
+
+ server = new Server(threadPool)
+ val connector = new ServerConnector(server)
+ connector.setIdleTimeout(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(0)
server.addConnector(connector)
- val threadPool = new QueuedThreadPool
- threadPool.setDaemon(true)
- server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)
@@ -79,7 +78,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
}
server.start()
- port = server.getConnectors()(0).getLocalPort()
+ port = connector.getLocalPort
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 7c35cd165a..cc73249396 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ui
import java.net.InetSocketAddress
import java.net.URL
+import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletResponse, HttpServletRequest}
import scala.annotation.tailrec
@@ -28,7 +29,7 @@ import scala.xml.Node
import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}
-import org.eclipse.jetty.server.{DispatcherType, Server}
+import org.eclipse.jetty.server.{NetworkConnector, Server}
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.servlet.{DefaultServlet, FilterHolder, ServletContextHandler, ServletHolder}
import org.eclipse.jetty.util.thread.QueuedThreadPool
@@ -60,8 +61,7 @@ private[spark] object JettyUtils extends Logging {
def createServlet[T <% AnyRef](servletParams: ServletParams[T],
securityMgr: SecurityManager): HttpServlet = {
new HttpServlet {
- override def doGet(request: HttpServletRequest,
- response: HttpServletResponse) {
+ override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) {
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
response.setStatus(HttpServletResponse.SC_OK)
@@ -72,7 +72,7 @@ private[spark] object JettyUtils extends Logging {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
- "User is not authorized to access this page.");
+ "User is not authorized to access this page.")
}
}
}
@@ -120,26 +120,25 @@ private[spark] object JettyUtils extends Logging {
private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
- filters.foreach {
- case filter : String =>
- if (!filter.isEmpty) {
- logInfo("Adding filter: " + filter)
- val holder : FilterHolder = new FilterHolder()
- holder.setClassName(filter)
- // get any parameters for each filter
- val paramName = "spark." + filter + ".params"
- val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
- params.foreach {
- case param : String =>
- if (!param.isEmpty) {
- val parts = param.split("=")
- if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
- }
- }
- val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
- DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
- handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }
+ filters.foreach { filter =>
+ if (!filter.isEmpty) {
+ logInfo("Adding filter: " + filter)
+ val holder : FilterHolder = new FilterHolder()
+ holder.setClassName(filter)
+ // get any parameters for each filter
+ val paramName = "spark." + filter + ".params"
+ val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
+ params.foreach {
+ case param : String =>
+ if (!param.isEmpty) {
+ val parts = param.split("=")
+ if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
+ }
}
+ val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
+ DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
+ handlers.foreach { handler => handler.addFilter(holder, "/*", enumDispatcher) }
+ }
}
}
@@ -150,7 +149,10 @@ private[spark] object JettyUtils extends Logging {
* If the desired port number is contented, continues incrementing ports until a free port is
* found. Returns the chosen port and the jetty Server object.
*/
- def startJettyServer(hostName: String, port: Int, handlers: Seq[ServletContextHandler],
+ def startJettyServer(
+ hostName: String,
+ port: Int,
+ handlers: Seq[ServletContextHandler],
conf: SparkConf): (Server, Int) = {
addFilters(handlers, conf)
@@ -160,16 +162,18 @@ private[spark] object JettyUtils extends Logging {
@tailrec
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
- val pool = new QueuedThreadPool
+ // Unfortunately Jetty 9 doesn't allow us to set both the thread pool and the port number in
+ // constructor. But fortunately the pool allocated by Jetty is always a QueuedThreadPool.
+ val pool = server.getThreadPool.asInstanceOf[QueuedThreadPool]
pool.setDaemon(true)
- server.setThreadPool(pool)
+
server.setHandler(handlerList)
Try {
server.start()
} match {
case s: Success[_] =>
- (server, server.getConnectors.head.getLocalPort)
+ (server, server.getConnectors.head.asInstanceOf[NetworkConnector].getLocalPort)
case f: Failure[_] =>
server.stop()
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))