aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMichel Lemay <mlemay@gmail.com>2015-08-12 16:17:58 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-12 16:41:35 -0700
commitab7e721cfec63155641e81e72b4ad43cf6a7d4c7 (patch)
tree2721b5b0b563fdbb3f6f6a2d6bca6ff838620e8d /core
parent738f353988dbf02704bd63f5e35d94402c59ed79 (diff)
downloadspark-ab7e721cfec63155641e81e72b4ad43cf6a7d4c7.tar.gz
spark-ab7e721cfec63155641e81e72b4ad43cf6a7d4c7.tar.bz2
spark-ab7e721cfec63155641e81e72b4ad43cf6a7d4c7.zip
[SPARK-9826] [CORE] Fix cannot use custom classes in log4j.properties
Refactor Utils class and create ShutdownHookManager. NOTE: Wasn't able to run /dev/run-tests on windows machine. Manual tests were conducted locally using custom log4j.properties file with Redis appender and logstash formatter (bundled in the fat-jar submitted to spark) ex: log4j.rootCategory=WARN,console,redis log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.spark.graphx.Pregel=INFO log4j.appender.redis=com.ryantenney.log4j.FailoverRedisAppender log4j.appender.redis.endpoints=hostname:port log4j.appender.redis.key=mykey log4j.appender.redis.alwaysBatch=false log4j.appender.redis.layout=net.logstash.log4j.JSONEventLayoutV1 Author: michellemay <mlemay@gmail.com> Closes #8109 from michellemay/SPARK-9826.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala266
-rw-r--r--core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala222
11 files changed, 294 insertions, 240 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 207a0c1bff..2e01a9a18c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -563,7 +563,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
- _shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
+ _shutdownHookRef = ShutdownHookManager.addShutdownHook(
+ ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
@@ -1671,7 +1672,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
return
}
if (_shutdownHookRef != null) {
- Utils.removeShutdownHook(_shutdownHookRef)
+ ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
}
Utils.tryLogNonFatalError {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a076a9c3f9..d4f327cc58 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, Applica
UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils}
/**
* A web server that renders SparkUIs of completed applications.
@@ -238,7 +238,7 @@ object HistoryServer extends Logging {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
- Utils.addShutdownHook { () => server.stop() }
+ ShutdownHookManager.addShutdownHook { () => server.stop() }
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 29a5042285..ab3fea475c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.logging.FileAppender
/**
@@ -70,7 +70,8 @@ private[deploy] class ExecutorRunner(
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
- shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
+ shutdownHook = ShutdownHookManager.addShutdownHook { () =>
+ killProcess(Some("Worker shutting down")) }
}
/**
@@ -102,7 +103,7 @@ private[deploy] class ExecutorRunner(
workerThread = null
state = ExecutorState.KILLED
try {
- Utils.removeShutdownHook(shutdownHook)
+ ShutdownHookManager.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f1c17369cb..e1f8719eea 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, NextIterator, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, NextIterator, Utils}
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
import org.apache.spark.storage.StorageLevel
@@ -274,7 +274,7 @@ class HadoopRDD[K, V](
}
} catch {
case e: Exception => {
- if (!Utils.inShutdown()) {
+ if (!ShutdownHookManager.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index f83a051f5d..6a9c004d65 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.StorageLevel
@@ -186,7 +186,7 @@ class NewHadoopRDD[K, V](
}
} catch {
case e: Exception => {
- if (!Utils.inShutdown()) {
+ if (!ShutdownHookManager.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 6a95e44c57..fa3fecc80c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.{Partition => SparkPartition, _}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils}
private[spark] class SqlNewHadoopPartition(
@@ -212,7 +212,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
}
} catch {
case e: Exception =>
- if (!Utils.inShutdown()) {
+ if (!ShutdownHookManager.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 56a33d5ca7..3f8d26e1d4 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -22,7 +22,7 @@ import java.io.{IOException, File}
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
@@ -144,7 +144,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
private def addShutdownHook(): AnyRef = {
- Utils.addShutdownHook(Utils.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
+ ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
logInfo("Shutdown hook called")
DiskBlockManager.this.doStop()
}
@@ -154,7 +154,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
try {
- Utils.removeShutdownHook(shutdownHook)
+ ShutdownHookManager.removeShutdownHook(shutdownHook)
} catch {
case e: Exception =>
logError(s"Exception while removing shutdown hook.", e)
@@ -168,7 +168,9 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
- if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
+ if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
+ Utils.deleteRecursively(localDir)
+ }
} catch {
case e: Exception =>
logError(s"Exception while deleting local spark dir: $localDir", e)
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index ebad5bc5ab..22878783fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -32,7 +32,7 @@ import tachyon.TachyonURI
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
/**
@@ -80,7 +80,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
// in order to avoid having really large inodes at the top level in Tachyon.
tachyonDirs = createTachyonDirs()
subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
- tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
+ tachyonDirs.foreach(tachyonDir => ShutdownHookManager.registerShutdownDeleteDir(tachyonDir))
}
override def toString: String = {"ExternalBlockStore-Tachyon"}
@@ -240,7 +240,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
- if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+ if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
new file mode 100644
index 0000000000..61ff9b89ec
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.File
+import java.util.PriorityQueue
+
+import scala.util.{Failure, Success, Try}
+import tachyon.client.TachyonFile
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.Logging
+
+/**
+ * Various utility methods used by Spark.
+ */
+private[spark] object ShutdownHookManager extends Logging {
+ val DEFAULT_SHUTDOWN_PRIORITY = 100
+
+ /**
+ * The shutdown priority of the SparkContext instance. This is lower than the default
+ * priority, so that by default hooks are run before the context is shut down.
+ */
+ val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+
+ /**
+ * The shutdown priority of temp directory must be lower than the SparkContext shutdown
+ * priority. Otherwise cleaning the temp directories while Spark jobs are running can
+ * throw undesirable errors at the time of shutdown.
+ */
+ val TEMP_DIR_SHUTDOWN_PRIORITY = 25
+
+ private lazy val shutdownHooks = {
+ val manager = new SparkShutdownHookManager()
+ manager.install()
+ manager
+ }
+
+ private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
+ private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
+
+ // Add a shutdown hook to delete the temp dirs when the JVM exits
+ addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
+ logInfo("Shutdown hook called")
+ shutdownDeletePaths.foreach { dirPath =>
+ try {
+ logInfo("Deleting directory " + dirPath)
+ Utils.deleteRecursively(new File(dirPath))
+ } catch {
+ case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
+ }
+ }
+ }
+
+ // Register the path to be deleted via shutdown hook
+ def registerShutdownDeleteDir(file: File) {
+ val absolutePath = file.getAbsolutePath()
+ shutdownDeletePaths.synchronized {
+ shutdownDeletePaths += absolutePath
+ }
+ }
+
+ // Register the tachyon path to be deleted via shutdown hook
+ def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
+ val absolutePath = tachyonfile.getPath()
+ shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths += absolutePath
+ }
+ }
+
+ // Remove the path to be deleted via shutdown hook
+ def removeShutdownDeleteDir(file: File) {
+ val absolutePath = file.getAbsolutePath()
+ shutdownDeletePaths.synchronized {
+ shutdownDeletePaths.remove(absolutePath)
+ }
+ }
+
+ // Remove the tachyon path to be deleted via shutdown hook
+ def removeShutdownDeleteDir(tachyonfile: TachyonFile) {
+ val absolutePath = tachyonfile.getPath()
+ shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths.remove(absolutePath)
+ }
+ }
+
+ // Is the path already registered to be deleted via a shutdown hook ?
+ def hasShutdownDeleteDir(file: File): Boolean = {
+ val absolutePath = file.getAbsolutePath()
+ shutdownDeletePaths.synchronized {
+ shutdownDeletePaths.contains(absolutePath)
+ }
+ }
+
+ // Is the path already registered to be deleted via a shutdown hook ?
+ def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
+ val absolutePath = file.getPath()
+ shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths.contains(absolutePath)
+ }
+ }
+
+ // Note: if file is child of some registered path, while not equal to it, then return true;
+ // else false. This is to ensure that two shutdown hooks do not try to delete each others
+ // paths - resulting in IOException and incomplete cleanup.
+ def hasRootAsShutdownDeleteDir(file: File): Boolean = {
+ val absolutePath = file.getAbsolutePath()
+ val retval = shutdownDeletePaths.synchronized {
+ shutdownDeletePaths.exists { path =>
+ !absolutePath.equals(path) && absolutePath.startsWith(path)
+ }
+ }
+ if (retval) {
+ logInfo("path = " + file + ", already present as root for deletion.")
+ }
+ retval
+ }
+
+ // Note: if file is child of some registered path, while not equal to it, then return true;
+ // else false. This is to ensure that two shutdown hooks do not try to delete each others
+ // paths - resulting in Exception and incomplete cleanup.
+ def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
+ val absolutePath = file.getPath()
+ val retval = shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths.exists { path =>
+ !absolutePath.equals(path) && absolutePath.startsWith(path)
+ }
+ }
+ if (retval) {
+ logInfo("path = " + file + ", already present as root for deletion.")
+ }
+ retval
+ }
+
+ /**
+ * Detect whether this thread might be executing a shutdown hook. Will always return true if
+ * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
+ * if System.exit was just called by a concurrent thread).
+ *
+ * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
+ * an IllegalStateException.
+ */
+ def inShutdown(): Boolean = {
+ try {
+ val hook = new Thread {
+ override def run() {}
+ }
+ Runtime.getRuntime.addShutdownHook(hook)
+ Runtime.getRuntime.removeShutdownHook(hook)
+ } catch {
+ case ise: IllegalStateException => return true
+ }
+ false
+ }
+
+ /**
+ * Adds a shutdown hook with default priority.
+ *
+ * @param hook The code to run during shutdown.
+ * @return A handle that can be used to unregister the shutdown hook.
+ */
+ def addShutdownHook(hook: () => Unit): AnyRef = {
+ addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
+ }
+
+ /**
+ * Adds a shutdown hook with the given priority. Hooks with lower priority values run
+ * first.
+ *
+ * @param hook The code to run during shutdown.
+ * @return A handle that can be used to unregister the shutdown hook.
+ */
+ def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
+ shutdownHooks.add(priority, hook)
+ }
+
+ /**
+ * Remove a previously installed shutdown hook.
+ *
+ * @param ref A handle returned by `addShutdownHook`.
+ * @return Whether the hook was removed.
+ */
+ def removeShutdownHook(ref: AnyRef): Boolean = {
+ shutdownHooks.remove(ref)
+ }
+
+}
+
+private [util] class SparkShutdownHookManager {
+
+ private val hooks = new PriorityQueue[SparkShutdownHook]()
+ private var shuttingDown = false
+
+ /**
+ * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
+ * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
+ * the best.
+ */
+ def install(): Unit = {
+ val hookTask = new Runnable() {
+ override def run(): Unit = runAll()
+ }
+ Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
+ case Success(shmClass) =>
+ val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+ .asInstanceOf[Int]
+ val shm = shmClass.getMethod("get").invoke(null)
+ shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
+ .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
+
+ case Failure(_) =>
+ Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
+ }
+ }
+
+ def runAll(): Unit = synchronized {
+ shuttingDown = true
+ while (!hooks.isEmpty()) {
+ Try(Utils.logUncaughtExceptions(hooks.poll().run()))
+ }
+ }
+
+ def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
+ checkState()
+ val hookRef = new SparkShutdownHook(priority, hook)
+ hooks.add(hookRef)
+ hookRef
+ }
+
+ def remove(ref: AnyRef): Boolean = synchronized {
+ hooks.remove(ref)
+ }
+
+ private def checkState(): Unit = {
+ if (shuttingDown) {
+ throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
+ }
+ }
+
+}
+
+private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
+ extends Comparable[SparkShutdownHook] {
+
+ override def compareTo(other: SparkShutdownHook): Int = {
+ other.priority - priority
+ }
+
+ def run(): Unit = hook()
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index ad3db1fbb5..7248187247 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -33,7 +33,7 @@ private[spark] object SparkUncaughtExceptionHandler
// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
- if (!Utils.inShutdown()) {
+ if (!ShutdownHookManager.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(SparkExitCode.OOM)
} else {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a90d854136..f2abf227dc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
-import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
+import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
@@ -65,21 +65,6 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()
- val DEFAULT_SHUTDOWN_PRIORITY = 100
-
- /**
- * The shutdown priority of the SparkContext instance. This is lower than the default
- * priority, so that by default hooks are run before the context is shut down.
- */
- val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
-
- /**
- * The shutdown priority of temp directory must be lower than the SparkContext shutdown
- * priority. Otherwise cleaning the temp directories while Spark jobs are running can
- * throw undesirable errors at the time of shutdown.
- */
- val TEMP_DIR_SHUTDOWN_PRIORITY = 25
-
/**
* Define a default value for driver memory here since this value is referenced across the code
* base and nearly all files already use Utils.scala
@@ -90,9 +75,6 @@ private[spark] object Utils extends Logging {
@volatile private var localRootDirs: Array[String] = null
- private val shutdownHooks = new SparkShutdownHookManager()
- shutdownHooks.install()
-
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -205,86 +187,6 @@ private[spark] object Utils extends Logging {
}
}
- private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
- private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
-
- // Add a shutdown hook to delete the temp dirs when the JVM exits
- addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
- logInfo("Shutdown hook called")
- shutdownDeletePaths.foreach { dirPath =>
- try {
- logInfo("Deleting directory " + dirPath)
- Utils.deleteRecursively(new File(dirPath))
- } catch {
- case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
- }
- }
- }
-
- // Register the path to be deleted via shutdown hook
- def registerShutdownDeleteDir(file: File) {
- val absolutePath = file.getAbsolutePath()
- shutdownDeletePaths.synchronized {
- shutdownDeletePaths += absolutePath
- }
- }
-
- // Register the tachyon path to be deleted via shutdown hook
- def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
- val absolutePath = tachyonfile.getPath()
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths += absolutePath
- }
- }
-
- // Is the path already registered to be deleted via a shutdown hook ?
- def hasShutdownDeleteDir(file: File): Boolean = {
- val absolutePath = file.getAbsolutePath()
- shutdownDeletePaths.synchronized {
- shutdownDeletePaths.contains(absolutePath)
- }
- }
-
- // Is the path already registered to be deleted via a shutdown hook ?
- def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
- val absolutePath = file.getPath()
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.contains(absolutePath)
- }
- }
-
- // Note: if file is child of some registered path, while not equal to it, then return true;
- // else false. This is to ensure that two shutdown hooks do not try to delete each others
- // paths - resulting in IOException and incomplete cleanup.
- def hasRootAsShutdownDeleteDir(file: File): Boolean = {
- val absolutePath = file.getAbsolutePath()
- val retval = shutdownDeletePaths.synchronized {
- shutdownDeletePaths.exists { path =>
- !absolutePath.equals(path) && absolutePath.startsWith(path)
- }
- }
- if (retval) {
- logInfo("path = " + file + ", already present as root for deletion.")
- }
- retval
- }
-
- // Note: if file is child of some registered path, while not equal to it, then return true;
- // else false. This is to ensure that two shutdown hooks do not try to delete each others
- // paths - resulting in Exception and incomplete cleanup.
- def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
- val absolutePath = file.getPath()
- val retval = shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.exists { path =>
- !absolutePath.equals(path) && absolutePath.startsWith(path)
- }
- }
- if (retval) {
- logInfo("path = " + file + ", already present as root for deletion.")
- }
- retval
- }
-
/**
* JDK equivalent of `chmod 700 file`.
*
@@ -333,7 +235,7 @@ private[spark] object Utils extends Logging {
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "spark"): File = {
val dir = createDirectory(root, namePrefix)
- registerShutdownDeleteDir(dir)
+ ShutdownHookManager.registerShutdownDeleteDir(dir)
dir
}
@@ -973,9 +875,7 @@ private[spark] object Utils extends Logging {
if (savedIOException != null) {
throw savedIOException
}
- shutdownDeletePaths.synchronized {
- shutdownDeletePaths.remove(file.getAbsolutePath)
- }
+ ShutdownHookManager.removeShutdownDeleteDir(file)
}
} finally {
if (!file.delete()) {
@@ -1478,27 +1378,6 @@ private[spark] object Utils extends Logging {
serializer.deserialize[T](serializer.serialize(value))
}
- /**
- * Detect whether this thread might be executing a shutdown hook. Will always return true if
- * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
- * if System.exit was just called by a concurrent thread).
- *
- * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
- * an IllegalStateException.
- */
- def inShutdown(): Boolean = {
- try {
- val hook = new Thread {
- override def run() {}
- }
- Runtime.getRuntime.addShutdownHook(hook)
- Runtime.getRuntime.removeShutdownHook(hook)
- } catch {
- case ise: IllegalStateException => return true
- }
- false
- }
-
private def isSpace(c: Char): Boolean = {
" \t\r\n".indexOf(c) != -1
}
@@ -2222,37 +2101,6 @@ private[spark] object Utils extends Logging {
}
/**
- * Adds a shutdown hook with default priority.
- *
- * @param hook The code to run during shutdown.
- * @return A handle that can be used to unregister the shutdown hook.
- */
- def addShutdownHook(hook: () => Unit): AnyRef = {
- addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
- }
-
- /**
- * Adds a shutdown hook with the given priority. Hooks with lower priority values run
- * first.
- *
- * @param hook The code to run during shutdown.
- * @return A handle that can be used to unregister the shutdown hook.
- */
- def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
- shutdownHooks.add(priority, hook)
- }
-
- /**
- * Remove a previously installed shutdown hook.
- *
- * @param ref A handle returned by `addShutdownHook`.
- * @return Whether the hook was removed.
- */
- def removeShutdownHook(ref: AnyRef): Boolean = {
- shutdownHooks.remove(ref)
- }
-
- /**
* To avoid calling `Utils.getCallSite` for every single RDD we create in the body,
* set a dummy call site that RDDs use instead. This is for performance optimization.
*/
@@ -2299,70 +2147,6 @@ private[spark] object Utils extends Logging {
}
-private [util] class SparkShutdownHookManager {
-
- private val hooks = new PriorityQueue[SparkShutdownHook]()
- private var shuttingDown = false
-
- /**
- * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
- * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
- * the best.
- */
- def install(): Unit = {
- val hookTask = new Runnable() {
- override def run(): Unit = runAll()
- }
- Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
- case Success(shmClass) =>
- val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
- .asInstanceOf[Int]
- val shm = shmClass.getMethod("get").invoke(null)
- shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
- .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
-
- case Failure(_) =>
- Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
- }
- }
-
- def runAll(): Unit = synchronized {
- shuttingDown = true
- while (!hooks.isEmpty()) {
- Try(Utils.logUncaughtExceptions(hooks.poll().run()))
- }
- }
-
- def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
- checkState()
- val hookRef = new SparkShutdownHook(priority, hook)
- hooks.add(hookRef)
- hookRef
- }
-
- def remove(ref: AnyRef): Boolean = synchronized {
- hooks.remove(ref)
- }
-
- private def checkState(): Unit = {
- if (shuttingDown) {
- throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
- }
- }
-
-}
-
-private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
- extends Comparable[SparkShutdownHook] {
-
- override def compareTo(other: SparkShutdownHook): Int = {
- other.priority - priority
- }
-
- def run(): Unit = hook()
-
-}
-
/**
* A utility class to redirect the child process's stdout or stderr.
*/