From 3e53de4bdd6d7b6de1fe3e5bfbdc53180aa9a737 Mon Sep 17 00:00:00 2001 From: Terence Yim Date: Tue, 12 Apr 2016 13:46:39 -0700 Subject: [SPARK-14513][CORE] Fix threads left behind after stopping SparkContext ## What changes were proposed in this pull request? Shutting down `QueuedThreadPool` used by Jetty `Server` to avoid threads leakage after SparkContext is stopped. Note: If this fix is going to apply to the `branch-1.6`, one more patch on the `NettyRpcEnv` class is needed so that the `NettyRpcEnv._fileServer.shutdown` is called in the `NettyRpcEnv.cleanup` method. This is due to the removal of `_fileServer` field in the `NettyRpcEnv` class in the master branch. Please advice if a second PR is necessary for bring this fix back to `branch-1.6` ## How was this patch tested? Ran the ./dev/run-tests locally Author: Terence Yim Closes #12318 from chtyim/fixes/SPARK-14513-thread-leak. --- core/src/main/scala/org/apache/spark/HttpServer.scala | 7 +++++++ core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 14 +++++++++++++- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- 3 files changed, 21 insertions(+), 2 deletions(-) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 9fad1f6786..982b6d6b61 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -25,6 +25,7 @@ import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.bio.SocketConnector import org.eclipse.jetty.server.ssl.SslSocketConnector import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder} +import org.eclipse.jetty.util.component.LifeCycle import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.util.thread.QueuedThreadPool @@ -155,6 +156,12 @@ private[spark] class HttpServer( throw new ServerStateException("Server is already stopped") } else { server.stop() + // Stop the ThreadPool if it supports stop() method (through LifeCycle). + // It is needed because stopping the Server won't stop the ThreadPool it uses. + val threadPool = server.getThreadPool + if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) { + threadPool.asInstanceOf[LifeCycle].stop + } port = -1 server = null } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index c3c59f857d..119165f724 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -30,6 +30,7 @@ import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.server.nio.SelectChannelConnector import org.eclipse.jetty.server.ssl.SslSelectChannelConnector import org.eclipse.jetty.servlet._ +import org.eclipse.jetty.util.component.LifeCycle import org.eclipse.jetty.util.thread.QueuedThreadPool import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} @@ -350,4 +351,15 @@ private[spark] object JettyUtils extends Logging { private[spark] case class ServerInfo( server: Server, boundPort: Int, - rootHandler: ContextHandlerCollection) + rootHandler: ContextHandlerCollection) { + + def stop(): Unit = { + server.stop() + // Stop the ThreadPool if it supports stop() method (through LifeCycle). + // It is needed because stopping the Server won't stop the ThreadPool it uses. + val threadPool = server.getThreadPool + if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) { + threadPool.asInstanceOf[LifeCycle].stop + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 250b7f2e5f..3939b111b5 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -153,7 +153,7 @@ private[spark] abstract class WebUI( def stop() { assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(className)) - serverInfo.get.server.stop() + serverInfo.get.stop() } } -- cgit v1.2.3