From dd681f502eafe39cfb8a5a62ea2d28016ac6013d Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 23 Apr 2014 23:20:55 -0700 Subject: SPARK-1587 Fix thread leak mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm. Author: Mridul Muralidharan Closes #504 from mridulm/resource_leak_fixes and squashes the following commits: a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail 7b5e19c [Mridul Muralidharan] Prevent NPE while running tests --- .../org/apache/spark/metrics/MetricsSystem.scala | 22 +++++++++-------- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 1 + .../org/apache/spark/storage/BlockManager.scala | 2 ++ .../apache/spark/storage/DiskBlockManager.scala | 28 +++++++++++++--------- .../apache/spark/storage/ShuffleBlockManager.scala | 4 ++++ .../scala/org/apache/spark/ui/JettyUtils.scala | 1 + .../spark/storage/DiskBlockManagerSuite.scala | 5 ++++ 7 files changed, 42 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index c5bda2078f..651511da1b 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String, sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") - try { - val sink = Class.forName(classPath) - .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) - if (kv._1 == "servlet") { - metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) - } else { - sinks += sink.asInstanceOf[Sink] + if (null != classPath) { + try { + val sink = Class.forName(classPath) + .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) + .newInstance(kv._2, registry, securityMgr) + if (kv._1 == "servlet") { + metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { + sinks += sink.asInstanceOf[Sink] + } + } catch { + case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e) } - } catch { - case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index be19d9b885..5a68f38bc5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } + starvationTimer.cancel() // sleeping for an arbitrary 1 seconds to ensure that messages are sent out. Thread.sleep(1000L) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f15fa4dd7f..ccd5c5320a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1021,6 +1021,8 @@ private[spark] class BlockManager( heartBeatTask.cancel() } connectionManager.stop() + shuffleBlockManager.stop() + diskBlockManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() memoryStore.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 7a24c8f57f..054f66a8b7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -150,20 +150,26 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { override def run() { logDebug("Shutdown hook called") - localDirs.foreach { localDir => - try { - if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) - } catch { - case t: Throwable => - logError("Exception while deleting local spark dir: " + localDir, t) - } - } + stop() + } + }) + } - if (shuffleSender != null) { - shuffleSender.stop() + private[spark] def stop() { + localDirs.foreach { localDir => + if (localDir.isDirectory() && localDir.exists()) { + try { + if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) + } catch { + case t: Throwable => + logError("Exception while deleting local spark dir: " + localDir, t) } } - }) + } + + if (shuffleSender != null) { + shuffleSender.stop() + } } private[storage] def startShuffleBlockSender(port: Int): Int = { diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 4cd4cdbd99..35910e552f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -207,6 +207,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private def cleanup(cleanupTime: Long) { shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) } + + def stop() { + metadataCleaner.cancel() + } } private[spark] diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 750f5a501c..fdeb15b5d0 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -195,6 +195,7 @@ private[spark] object JettyUtils extends Logging { (server, server.getConnectors.head.getLocalPort) case f: Failure[_] => server.stop() + pool.stop() logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) logInfo("Error was: " + f.toString) connect((currentPort + 1) % 65536) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 9b29e2a8a5..42bfbf1bdf 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -53,6 +53,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { shuffleBlockManager.idToSegmentMap.clear() } + override def afterEach() { + diskBlockManager.stop() + shuffleBlockManager.idToSegmentMap.clear() + } + test("basic block creation") { val blockId = new TestBlockId("test") assertSegmentEquals(blockId, blockId.name, 0, 0) -- cgit v1.2.3