diff options
author | jerryshao <sshao@hortonworks.com> | 2016-04-20 10:48:11 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-20 10:48:11 -0700 |
commit | 90cbc82fd4114219a5a0f180b1908a18985fda3e (patch) | |
tree | d71e6cd6caa0c06ec5ef79e6da1958c491da352d | |
parent | b4e76a9a3b58822fcbe5a8b137618a32c4033755 (diff) | |
download | spark-90cbc82fd4114219a5a0f180b1908a18985fda3e.tar.gz spark-90cbc82fd4114219a5a0f180b1908a18985fda3e.tar.bz2 spark-90cbc82fd4114219a5a0f180b1908a18985fda3e.zip |
[SPARK-14725][CORE] Remove HttpServer class
## What changes were proposed in this pull request?
This proposal removes the class `HttpServer`, with the changing of internal file/jar/class transmission to RPC layer, currently there's no code using this `HttpServer`, so here propose to remove it.
## How was this patch tested?
Unit test is verified locally.
Author: jerryshao <sshao@hortonworks.com>
Closes #12526 from jerryshao/SPARK-14725.
-rw-r--r-- | core/src/main/scala/org/apache/spark/HttpServer.scala | 181 | ||||
-rw-r--r-- | repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala | 53 |
2 files changed, 0 insertions, 234 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala deleted file mode 100644 index 982b6d6b61..0000000000 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.io.File - -import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService} -import org.eclipse.jetty.security.authentication.DigestAuthenticator -import org.eclipse.jetty.server.Server -import org.eclipse.jetty.server.bio.SocketConnector -import org.eclipse.jetty.server.ssl.SslSocketConnector -import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder} -import org.eclipse.jetty.util.component.LifeCycle -import org.eclipse.jetty.util.security.{Constraint, Password} -import org.eclipse.jetty.util.thread.QueuedThreadPool - -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -/** - * Exception type thrown by HttpServer when it is in the wrong state for an operation. - */ -private[spark] class ServerStateException(message: String) extends Exception(message) - -/** - * An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext - * as well as classes created by the interpreter when the user types in code. This is just a wrapper - * around a Jetty server. - */ -private[spark] class HttpServer( - conf: SparkConf, - resourceBase: File, - securityManager: SecurityManager, - requestedPort: Int = 0, - serverName: String = "HTTP server") - extends Logging { - - private var server: Server = null - private var port: Int = requestedPort - private val servlets = { - val handler = new ServletContextHandler() - handler.setContextPath("/") - handler - } - - def start() { - if (server != null) { - throw new ServerStateException("Server is already started") - } else { - logInfo("Starting HTTP Server") - val (actualServer, actualPort) = - Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName) - server = actualServer - port = actualPort - } - } - - def addDirectory(contextPath: String, resourceBase: String): Unit = { - val holder = new ServletHolder() - holder.setInitParameter("resourceBase", resourceBase) - holder.setInitParameter("pathInfoOnly", "true") - holder.setServlet(new DefaultServlet()) - servlets.addServlet(holder, contextPath.stripSuffix("/") + "/*") - } - - /** - * Actually start the HTTP server on the given port. - * - * Note that this is only best effort in the sense that we may end up binding to a nearby port - * in the event of port collision. Return the bound server and the actual port used. - */ - private def doStart(startPort: Int): (Server, Int) = { - val server = new Server() - - val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory() - .map(new SslSocketConnector(_)).getOrElse(new SocketConnector) - - connector.setMaxIdleTime(60 * 1000) - connector.setSoLingerTime(-1) - connector.setPort(startPort) - server.addConnector(connector) - - val threadPool = new QueuedThreadPool - threadPool.setDaemon(true) - server.setThreadPool(threadPool) - addDirectory("/", resourceBase.getAbsolutePath) - - if (securityManager.isAuthenticationEnabled()) { - logDebug("HttpServer is using security") - val sh = setupSecurityHandler(securityManager) - // make sure we go through security handler to get resources - sh.setHandler(servlets) - server.setHandler(sh) - } else { - logDebug("HttpServer is not using security") - server.setHandler(servlets) - } - - server.start() - val actualPort = server.getConnectors()(0).getLocalPort - - (server, actualPort) - } - - /** - * Setup Jetty to the HashLoginService using a single user with our - * shared secret. Configure it to use DIGEST-MD5 authentication so that the password - * isn't passed in plaintext. - */ - private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = { - val constraint = new Constraint() - // use DIGEST-MD5 as the authentication mechanism - constraint.setName(Constraint.__DIGEST_AUTH) - constraint.setRoles(Array("user")) - constraint.setAuthenticate(true) - constraint.setDataConstraint(Constraint.DC_NONE) - - val cm = new ConstraintMapping() - cm.setConstraint(constraint) - cm.setPathSpec("/*") - val sh = new ConstraintSecurityHandler() - - // the hashLoginService lets us do a single user and - // secret right now. This could be changed to use the - // JAASLoginService for other options. - val hashLogin = new HashLoginService() - - val userCred = new Password(securityMgr.getSecretKey()) - if (userCred == null) { - throw new Exception("Error: secret key is null with authentication on") - } - hashLogin.putUser(securityMgr.getHttpUser(), userCred, Array("user")) - sh.setLoginService(hashLogin) - sh.setAuthenticator(new DigestAuthenticator()); - sh.setConstraintMappings(Array(cm)) - sh - } - - def stop() { - if (server == null) { - throw new ServerStateException("Server is already stopped") - } else { - server.stop() - // Stop the ThreadPool if it supports stop() method (through LifeCycle). - // It is needed because stopping the Server won't stop the ThreadPool it uses. - val threadPool = server.getThreadPool - if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) { - threadPool.asInstanceOf[LifeCycle].stop - } - port = -1 - server = null - } - } - - /** - * Get the URI of this HTTP server (http://host:port or https://host:port) - */ - def uri: String = { - if (server == null) { - throw new ServerStateException("Server is not started") - } else { - val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http" - s"$scheme://${Utils.localHostNameForURI()}:$port" - } - } -} diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala index 9a143ee36f..12e98565dc 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala @@ -57,7 +57,6 @@ class ExecutorClassLoaderSuite var tempDir2: File = _ var url1: String = _ var urls2: Array[URL] = _ - var classServer: HttpServer = _ override def beforeAll() { super.beforeAll() @@ -74,9 +73,6 @@ class ExecutorClassLoaderSuite override def afterAll() { try { - if (classServer != null) { - classServer.stop() - } Utils.deleteRecursively(tempDir1) Utils.deleteRecursively(tempDir2) SparkEnv.set(null) @@ -137,55 +133,6 @@ class ExecutorClassLoaderSuite assert(fileReader.readLine().contains("resource"), "File doesn't contain 'resource'") } - test("failing to fetch classes from HTTP server should not leak resources (SPARK-6209)") { - // This is a regression test for SPARK-6209, a bug where each failed attempt to load a class - // from the driver's class server would leak a HTTP connection, causing the class server's - // thread / connection pool to be exhausted. - val conf = new SparkConf() - val securityManager = new SecurityManager(conf) - classServer = new HttpServer(conf, tempDir1, securityManager) - classServer.start() - // ExecutorClassLoader uses SparkEnv's SecurityManager, so we need to mock this - val mockEnv = mock[SparkEnv] - when(mockEnv.securityManager).thenReturn(securityManager) - SparkEnv.set(mockEnv) - // Create an ExecutorClassLoader that's configured to load classes from the HTTP server - val parentLoader = new URLClassLoader(Array.empty, null) - val classLoader = new ExecutorClassLoader(conf, null, classServer.uri, parentLoader, false) - classLoader.httpUrlConnectionTimeoutMillis = 500 - // Check that this class loader can actually load classes that exist - val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance() - val fakeClassVersion = fakeClass.toString - assert(fakeClassVersion === "1") - // Try to perform a full GC now, since GC during the test might mask resource leaks - System.gc() - // When the original bug occurs, the test thread becomes blocked in a classloading call - // and does not respond to interrupts. Therefore, use a custom ScalaTest interruptor to - // shut down the HTTP server when the test times out - val interruptor: Interruptor = new Interruptor { - override def apply(thread: Thread): Unit = { - classServer.stop() - classServer = null - thread.interrupt() - } - } - def tryAndFailToLoadABunchOfClasses(): Unit = { - // The number of trials here should be much larger than Jetty's thread / connection limit - // in order to expose thread or connection leaks - for (i <- 1 to 1000) { - if (Thread.currentThread().isInterrupted) { - throw new InterruptedException() - } - // Incorporate the iteration number into the class name in order to avoid any response - // caching that might be added in the future - intercept[ClassNotFoundException] { - classLoader.loadClass(s"ReplFakeClassDoesNotExist$i").newInstance() - } - } - } - failAfter(10 seconds)(tryAndFailToLoadABunchOfClasses())(interruptor) - } - test("fetch classes using Spark's RpcEnv") { val env = mock[SparkEnv] val rpcEnv = mock[RpcEnv] |