aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridulm80@apache.org>2014-04-23 23:20:55 -0700
committerAaron Davidson <aaron@databricks.com>2014-04-23 23:20:55 -0700
commitdd681f502eafe39cfb8a5a62ea2d28016ac6013d (patch)
treea463411676bdda7058d38dd86218cf60de3657ec /core
parentbb68f47745eec2954814d3da277a672d5cf89980 (diff)
downloadspark-dd681f502eafe39cfb8a5a62ea2d28016ac6013d.tar.gz
spark-dd681f502eafe39cfb8a5a62ea2d28016ac6013d.tar.bz2
spark-dd681f502eafe39cfb8a5a62ea2d28016ac6013d.zip
SPARK-1587 Fix thread leak
mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm. Author: Mridul Muralidharan <mridulm80@apache.org> Closes #504 from mridulm/resource_leak_fixes and squashes the following commits: a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail 7b5e19c [Mridul Muralidharan] Prevent NPE while running tests
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala5
7 files changed, 42 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index c5bda2078f..651511da1b 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String,
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
- try {
- val sink = Class.forName(classPath)
- .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
- .newInstance(kv._2, registry, securityMgr)
- if (kv._1 == "servlet") {
- metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
- } else {
- sinks += sink.asInstanceOf[Sink]
+ if (null != classPath) {
+ try {
+ val sink = Class.forName(classPath)
+ .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
+ .newInstance(kv._2, registry, securityMgr)
+ if (kv._1 == "servlet") {
+ metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+ } else {
+ sinks += sink.asInstanceOf[Sink]
+ }
+ } catch {
+ case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
}
- } catch {
- case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index be19d9b885..5a68f38bc5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl(
if (taskResultGetter != null) {
taskResultGetter.stop()
}
+ starvationTimer.cancel()
// sleeping for an arbitrary 1 seconds to ensure that messages are sent out.
Thread.sleep(1000L)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f15fa4dd7f..ccd5c5320a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1021,6 +1021,8 @@ private[spark] class BlockManager(
heartBeatTask.cancel()
}
connectionManager.stop()
+ shuffleBlockManager.stop()
+ diskBlockManager.stop()
actorSystem.stop(slaveActor)
blockInfo.clear()
memoryStore.clear()
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 7a24c8f57f..054f66a8b7 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -150,20 +150,26 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
logDebug("Shutdown hook called")
- localDirs.foreach { localDir =>
- try {
- if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
- } catch {
- case t: Throwable =>
- logError("Exception while deleting local spark dir: " + localDir, t)
- }
- }
+ stop()
+ }
+ })
+ }
- if (shuffleSender != null) {
- shuffleSender.stop()
+ private[spark] def stop() {
+ localDirs.foreach { localDir =>
+ if (localDir.isDirectory() && localDir.exists()) {
+ try {
+ if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
+ } catch {
+ case t: Throwable =>
+ logError("Exception while deleting local spark dir: " + localDir, t)
}
}
- })
+ }
+
+ if (shuffleSender != null) {
+ shuffleSender.stop()
+ }
}
private[storage] def startShuffleBlockSender(port: Int): Int = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 4cd4cdbd99..35910e552f 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -207,6 +207,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
private def cleanup(cleanupTime: Long) {
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
}
+
+ def stop() {
+ metadataCleaner.cancel()
+ }
}
private[spark]
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 750f5a501c..fdeb15b5d0 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -195,6 +195,7 @@ private[spark] object JettyUtils extends Logging {
(server, server.getConnectors.head.getLocalPort)
case f: Failure[_] =>
server.stop()
+ pool.stop()
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
logInfo("Error was: " + f.toString)
connect((currentPort + 1) % 65536)
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 9b29e2a8a5..42bfbf1bdf 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -53,6 +53,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
shuffleBlockManager.idToSegmentMap.clear()
}
+ override def afterEach() {
+ diskBlockManager.stop()
+ shuffleBlockManager.idToSegmentMap.clear()
+ }
+
test("basic block creation") {
val blockId = new TestBlockId("test")
assertSegmentEquals(blockId, blockId.name, 0, 0)