diff options
author | tedyu <yuzhihong@gmail.com> | 2015-12-16 19:02:12 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-12-16 19:02:12 -0800 |
commit | f590178d7a06221a93286757c68b23919bee9f03 (patch) | |
tree | 94e096dc45037353b602d29f1b2df7d471bcab2d | |
parent | 38d9795a4fa07086d65ff705ce86648345618736 (diff) | |
download | spark-f590178d7a06221a93286757c68b23919bee9f03.tar.gz spark-f590178d7a06221a93286757c68b23919bee9f03.tar.bz2 spark-f590178d7a06221a93286757c68b23919bee9f03.zip |
[SPARK-12365][CORE] Use ShutdownHookManager where Runtime.getRuntime.addShutdownHook() is called
SPARK-9886 fixed ExternalBlockStore.scala
This PR fixes the remaining references to Runtime.getRuntime.addShutdownHook()
Author: tedyu <yuzhihong@gmail.com>
Closes #10325 from ted-yu/master.
5 files changed, 38 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index e8a1e35c3f..7fc96e4f76 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -28,7 +28,7 @@ import org.apache.spark.network.sasl.SaslServerBootstrap import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShutdownHookManager, Utils} /** * Provides a server from which Executors can read shuffle files (rather than reading directly from @@ -118,19 +118,13 @@ object ExternalShuffleService extends Logging { server = newShuffleService(sparkConf, securityManager) server.start() - installShutdownHook() + ShutdownHookManager.addShutdownHook { () => + logInfo("Shutting down shuffle service.") + server.stop() + barrier.countDown() + } // keep running until the process is terminated barrier.await() } - - private def installShutdownHook(): Unit = { - Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") { - override def run() { - logInfo("Shutting down shuffle service.") - server.stop() - barrier.countDown() - } - }) - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 5d4e5b899d..389eff5e06 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch import org.apache.spark.deploy.mesos.ui.MesosClusterUI import org.apache.spark.deploy.rest.mesos.MesosRestServer import org.apache.spark.scheduler.cluster.mesos._ -import org.apache.spark.util.SignalLogger +import org.apache.spark.util.{ShutdownHookManager, SignalLogger} import org.apache.spark.{Logging, SecurityManager, SparkConf} /* @@ -103,14 +103,11 @@ private[mesos] object MesosClusterDispatcher extends Logging { } val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf) dispatcher.start() - val shutdownHook = new Thread() { - override def run() { - logInfo("Shutdown hook is shutting down dispatcher") - dispatcher.stop() - dispatcher.awaitShutdown() - } + ShutdownHookManager.addShutdownHook { () => + logInfo("Shutdown hook is shutting down dispatcher") + dispatcher.stop() + dispatcher.awaitShutdown() } - Runtime.getRuntime.addShutdownHook(shutdownHook) dispatcher.awaitShutdown() } } diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 620f226a23..1a0f3b477b 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -162,7 +162,9 @@ private[spark] object ShutdownHookManager extends Logging { val hook = new Thread { override def run() {} } + // scalastyle:off runtimeaddshutdownhook Runtime.getRuntime.addShutdownHook(hook) + // scalastyle:on runtimeaddshutdownhook Runtime.getRuntime.removeShutdownHook(hook) } catch { case ise: IllegalStateException => return true @@ -228,7 +230,9 @@ private [util] class SparkShutdownHookManager { .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30)) case Failure(_) => + // scalastyle:off runtimeaddshutdownhook Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook")); + // scalastyle:on runtimeaddshutdownhook } } diff --git a/scalastyle-config.xml b/scalastyle-config.xml index dab1ebddc6..6925e18737 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -157,6 +157,18 @@ This file is divided into 3 sections: ]]></customMessage> </check> + <check customId="runtimeaddshutdownhook" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter></parameters> + <customMessage><![CDATA[ + Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In most cases, you should use + ShutdownHookManager.addShutdownHook instead. + If you must use Runtime.getRuntime.addShutdownHook, wrap the code block with + // scalastyle:off runtimeaddshutdownhook + Runtime.getRuntime.addShutdownHook(...) + // scalastyle:on runtimeaddshutdownhook + ]]></customMessage> + </check> + <check customId="classforname" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">Class\.forName</parameter></parameters> <customMessage><![CDATA[ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 03bb2c2225..8e7aa75bc3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -195,20 +195,18 @@ private[hive] object SparkSQLCLIDriver extends Logging { } // add shutdown hook to flush the history to history file - Runtime.getRuntime.addShutdownHook(new Thread(new Runnable() { - override def run() = { - reader.getHistory match { - case h: FileHistory => - try { - h.flush() - } catch { - case e: IOException => - logWarning("WARNING: Failed to write command history file: " + e.getMessage) - } - case _ => - } + ShutdownHookManager.addShutdownHook { () => + reader.getHistory match { + case h: FileHistory => + try { + h.flush() + } catch { + case e: IOException => + logWarning("WARNING: Failed to write command history file: " + e.getMessage) + } + case _ => } - })) + } // TODO: missing /* |