aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatt Whelan <mwhelan@perka.com>2015-02-16 22:54:32 +0000
committerSean Owen <sowen@cloudera.com>2015-02-16 22:54:32 +0000
commitbb05982dd25e008fb01684dff1f95d03e7271721 (patch)
treef91c56ad7b04fd7250eee50239501a063145cb2a /core
parentc51ab37faddf4ede23243058dfb388e74a192552 (diff)
downloadspark-bb05982dd25e008fb01684dff1f95d03e7271721.tar.gz
spark-bb05982dd25e008fb01684dff1f95d03e7271721.tar.bz2
spark-bb05982dd25e008fb01684dff1f95d03e7271721.zip
SPARK-5841: remove DiskBlockManager shutdown hook on stop
After a call to stop, the shutdown hook is redundant, and causes a memory leak. Author: Matt Whelan <mwhelan@perka.com> Closes #4627 from MattWhelan/SPARK-5841 and squashes the following commits: d5f5c7f [Matt Whelan] SPARK-5841: remove DiskBlockManager shutdown hook on stop
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala13
1 files changed, 9 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 53eaedacbf..ae9df8cbe9 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -49,7 +49,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
- addShutdownHook()
+ private val shutdownHook = addShutdownHook()
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
@@ -134,17 +134,22 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}
- private def addShutdownHook() {
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
+ private def addShutdownHook(): Thread = {
+ val shutdownHook = new Thread("delete Spark local dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DiskBlockManager.this.stop()
}
- })
+ }
+ Runtime.getRuntime.addShutdownHook(shutdownHook)
+ shutdownHook
}
/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
+ // Remove the shutdown hook. It causes memory leaks if we leave it around.
+ Runtime.getRuntime.removeShutdownHook(shutdownHook)
+
// Only perform cleanup if an external service is not serving our shuffle files.
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
localDirs.foreach { localDir =>