aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala181
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala53
2 files changed, 0 insertions, 234 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
deleted file mode 100644
index 982b6d6b61..0000000000
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
-import org.eclipse.jetty.security.authentication.DigestAuthenticator
-import org.eclipse.jetty.server.Server
-import org.eclipse.jetty.server.bio.SocketConnector
-import org.eclipse.jetty.server.ssl.SslSocketConnector
-import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
-import org.eclipse.jetty.util.component.LifeCycle
-import org.eclipse.jetty.util.security.{Constraint, Password}
-import org.eclipse.jetty.util.thread.QueuedThreadPool
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * Exception type thrown by HttpServer when it is in the wrong state for an operation.
- */
-private[spark] class ServerStateException(message: String) extends Exception(message)
-
-/**
- * An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext
- * as well as classes created by the interpreter when the user types in code. This is just a wrapper
- * around a Jetty server.
- */
-private[spark] class HttpServer(
- conf: SparkConf,
- resourceBase: File,
- securityManager: SecurityManager,
- requestedPort: Int = 0,
- serverName: String = "HTTP server")
- extends Logging {
-
- private var server: Server = null
- private var port: Int = requestedPort
- private val servlets = {
- val handler = new ServletContextHandler()
- handler.setContextPath("/")
- handler
- }
-
- def start() {
- if (server != null) {
- throw new ServerStateException("Server is already started")
- } else {
- logInfo("Starting HTTP Server")
- val (actualServer, actualPort) =
- Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
- server = actualServer
- port = actualPort
- }
- }
-
- def addDirectory(contextPath: String, resourceBase: String): Unit = {
- val holder = new ServletHolder()
- holder.setInitParameter("resourceBase", resourceBase)
- holder.setInitParameter("pathInfoOnly", "true")
- holder.setServlet(new DefaultServlet())
- servlets.addServlet(holder, contextPath.stripSuffix("/") + "/*")
- }
-
- /**
- * Actually start the HTTP server on the given port.
- *
- * Note that this is only best effort in the sense that we may end up binding to a nearby port
- * in the event of port collision. Return the bound server and the actual port used.
- */
- private def doStart(startPort: Int): (Server, Int) = {
- val server = new Server()
-
- val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
- .map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
-
- connector.setMaxIdleTime(60 * 1000)
- connector.setSoLingerTime(-1)
- connector.setPort(startPort)
- server.addConnector(connector)
-
- val threadPool = new QueuedThreadPool
- threadPool.setDaemon(true)
- server.setThreadPool(threadPool)
- addDirectory("/", resourceBase.getAbsolutePath)
-
- if (securityManager.isAuthenticationEnabled()) {
- logDebug("HttpServer is using security")
- val sh = setupSecurityHandler(securityManager)
- // make sure we go through security handler to get resources
- sh.setHandler(servlets)
- server.setHandler(sh)
- } else {
- logDebug("HttpServer is not using security")
- server.setHandler(servlets)
- }
-
- server.start()
- val actualPort = server.getConnectors()(0).getLocalPort
-
- (server, actualPort)
- }
-
- /**
- * Setup Jetty to the HashLoginService using a single user with our
- * shared secret. Configure it to use DIGEST-MD5 authentication so that the password
- * isn't passed in plaintext.
- */
- private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
- val constraint = new Constraint()
- // use DIGEST-MD5 as the authentication mechanism
- constraint.setName(Constraint.__DIGEST_AUTH)
- constraint.setRoles(Array("user"))
- constraint.setAuthenticate(true)
- constraint.setDataConstraint(Constraint.DC_NONE)
-
- val cm = new ConstraintMapping()
- cm.setConstraint(constraint)
- cm.setPathSpec("/*")
- val sh = new ConstraintSecurityHandler()
-
- // the hashLoginService lets us do a single user and
- // secret right now. This could be changed to use the
- // JAASLoginService for other options.
- val hashLogin = new HashLoginService()
-
- val userCred = new Password(securityMgr.getSecretKey())
- if (userCred == null) {
- throw new Exception("Error: secret key is null with authentication on")
- }
- hashLogin.putUser(securityMgr.getHttpUser(), userCred, Array("user"))
- sh.setLoginService(hashLogin)
- sh.setAuthenticator(new DigestAuthenticator());
- sh.setConstraintMappings(Array(cm))
- sh
- }
-
- def stop() {
- if (server == null) {
- throw new ServerStateException("Server is already stopped")
- } else {
- server.stop()
- // Stop the ThreadPool if it supports stop() method (through LifeCycle).
- // It is needed because stopping the Server won't stop the ThreadPool it uses.
- val threadPool = server.getThreadPool
- if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
- threadPool.asInstanceOf[LifeCycle].stop
- }
- port = -1
- server = null
- }
- }
-
- /**
- * Get the URI of this HTTP server (http://host:port or https://host:port)
- */
- def uri: String = {
- if (server == null) {
- throw new ServerStateException("Server is not started")
- } else {
- val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
- s"$scheme://${Utils.localHostNameForURI()}:$port"
- }
- }
-}
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 9a143ee36f..12e98565dc 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -57,7 +57,6 @@ class ExecutorClassLoaderSuite
var tempDir2: File = _
var url1: String = _
var urls2: Array[URL] = _
- var classServer: HttpServer = _
override def beforeAll() {
super.beforeAll()
@@ -74,9 +73,6 @@ class ExecutorClassLoaderSuite
override def afterAll() {
try {
- if (classServer != null) {
- classServer.stop()
- }
Utils.deleteRecursively(tempDir1)
Utils.deleteRecursively(tempDir2)
SparkEnv.set(null)
@@ -137,55 +133,6 @@ class ExecutorClassLoaderSuite
assert(fileReader.readLine().contains("resource"), "File doesn't contain 'resource'")
}
- test("failing to fetch classes from HTTP server should not leak resources (SPARK-6209)") {
- // This is a regression test for SPARK-6209, a bug where each failed attempt to load a class
- // from the driver's class server would leak a HTTP connection, causing the class server's
- // thread / connection pool to be exhausted.
- val conf = new SparkConf()
- val securityManager = new SecurityManager(conf)
- classServer = new HttpServer(conf, tempDir1, securityManager)
- classServer.start()
- // ExecutorClassLoader uses SparkEnv's SecurityManager, so we need to mock this
- val mockEnv = mock[SparkEnv]
- when(mockEnv.securityManager).thenReturn(securityManager)
- SparkEnv.set(mockEnv)
- // Create an ExecutorClassLoader that's configured to load classes from the HTTP server
- val parentLoader = new URLClassLoader(Array.empty, null)
- val classLoader = new ExecutorClassLoader(conf, null, classServer.uri, parentLoader, false)
- classLoader.httpUrlConnectionTimeoutMillis = 500
- // Check that this class loader can actually load classes that exist
- val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
- val fakeClassVersion = fakeClass.toString
- assert(fakeClassVersion === "1")
- // Try to perform a full GC now, since GC during the test might mask resource leaks
- System.gc()
- // When the original bug occurs, the test thread becomes blocked in a classloading call
- // and does not respond to interrupts. Therefore, use a custom ScalaTest interruptor to
- // shut down the HTTP server when the test times out
- val interruptor: Interruptor = new Interruptor {
- override def apply(thread: Thread): Unit = {
- classServer.stop()
- classServer = null
- thread.interrupt()
- }
- }
- def tryAndFailToLoadABunchOfClasses(): Unit = {
- // The number of trials here should be much larger than Jetty's thread / connection limit
- // in order to expose thread or connection leaks
- for (i <- 1 to 1000) {
- if (Thread.currentThread().isInterrupted) {
- throw new InterruptedException()
- }
- // Incorporate the iteration number into the class name in order to avoid any response
- // caching that might be added in the future
- intercept[ClassNotFoundException] {
- classLoader.loadClass(s"ReplFakeClassDoesNotExist$i").newInstance()
- }
- }
- }
- failAfter(10 seconds)(tryAndFailToLoadABunchOfClasses())(interruptor)
- }
-
test("fetch classes using Spark's RpcEnv") {
val env = mock[SparkEnv]
val rpcEnv = mock[RpcEnv]