aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortedyu <yuzhihong@gmail.com>2015-12-16 19:02:12 -0800
committerAndrew Or <andrew@databricks.com>2015-12-16 19:02:12 -0800
commitf590178d7a06221a93286757c68b23919bee9f03 (patch)
tree94e096dc45037353b602d29f1b2df7d471bcab2d
parent38d9795a4fa07086d65ff705ce86648345618736 (diff)
downloadspark-f590178d7a06221a93286757c68b23919bee9f03.tar.gz
spark-f590178d7a06221a93286757c68b23919bee9f03.tar.bz2
spark-f590178d7a06221a93286757c68b23919bee9f03.zip
[SPARK-12365][CORE] Use ShutdownHookManager where Runtime.getRuntime.addShutdownHook() is called
SPARK-9886 fixed ExternalBlockStore.scala This PR fixes the remaining references to Runtime.getRuntime.addShutdownHook() Author: tedyu <yuzhihong@gmail.com> Closes #10325 from ted-yu/master.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala4
-rw-r--r--scalastyle-config.xml12
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala24
5 files changed, 38 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index e8a1e35c3f..7fc96e4f76 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -28,7 +28,7 @@ import org.apache.spark.network.sasl.SaslServerBootstrap
import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.util.TransportConf
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
/**
* Provides a server from which Executors can read shuffle files (rather than reading directly from
@@ -118,19 +118,13 @@ object ExternalShuffleService extends Logging {
server = newShuffleService(sparkConf, securityManager)
server.start()
- installShutdownHook()
+ ShutdownHookManager.addShutdownHook { () =>
+ logInfo("Shutting down shuffle service.")
+ server.stop()
+ barrier.countDown()
+ }
// keep running until the process is terminated
barrier.await()
}
-
- private def installShutdownHook(): Unit = {
- Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
- override def run() {
- logInfo("Shutting down shuffle service.")
- server.stop()
- barrier.countDown()
- }
- })
- }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index 5d4e5b899d..389eff5e06 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
import org.apache.spark.scheduler.cluster.mesos._
-import org.apache.spark.util.SignalLogger
+import org.apache.spark.util.{ShutdownHookManager, SignalLogger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
/*
@@ -103,14 +103,11 @@ private[mesos] object MesosClusterDispatcher extends Logging {
}
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
dispatcher.start()
- val shutdownHook = new Thread() {
- override def run() {
- logInfo("Shutdown hook is shutting down dispatcher")
- dispatcher.stop()
- dispatcher.awaitShutdown()
- }
+ ShutdownHookManager.addShutdownHook { () =>
+ logInfo("Shutdown hook is shutting down dispatcher")
+ dispatcher.stop()
+ dispatcher.awaitShutdown()
}
- Runtime.getRuntime.addShutdownHook(shutdownHook)
dispatcher.awaitShutdown()
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 620f226a23..1a0f3b477b 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -162,7 +162,9 @@ private[spark] object ShutdownHookManager extends Logging {
val hook = new Thread {
override def run() {}
}
+ // scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(hook)
+ // scalastyle:on runtimeaddshutdownhook
Runtime.getRuntime.removeShutdownHook(hook)
} catch {
case ise: IllegalStateException => return true
@@ -228,7 +230,9 @@ private [util] class SparkShutdownHookManager {
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
case Failure(_) =>
+ // scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
+ // scalastyle:on runtimeaddshutdownhook
}
}
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index dab1ebddc6..6925e18737 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -157,6 +157,18 @@ This file is divided into 3 sections:
]]></customMessage>
</check>
+ <check customId="runtimeaddshutdownhook" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters><parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter></parameters>
+ <customMessage><![CDATA[
+ Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In most cases, you should use
+ ShutdownHookManager.addShutdownHook instead.
+ If you must use Runtime.getRuntime.addShutdownHook, wrap the code block with
+ // scalastyle:off runtimeaddshutdownhook
+ Runtime.getRuntime.addShutdownHook(...)
+ // scalastyle:on runtimeaddshutdownhook
+ ]]></customMessage>
+ </check>
+
<check customId="classforname" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">Class\.forName</parameter></parameters>
<customMessage><![CDATA[
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 03bb2c2225..8e7aa75bc3 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -195,20 +195,18 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}
// add shutdown hook to flush the history to history file
- Runtime.getRuntime.addShutdownHook(new Thread(new Runnable() {
- override def run() = {
- reader.getHistory match {
- case h: FileHistory =>
- try {
- h.flush()
- } catch {
- case e: IOException =>
- logWarning("WARNING: Failed to write command history file: " + e.getMessage)
- }
- case _ =>
- }
+ ShutdownHookManager.addShutdownHook { () =>
+ reader.getHistory match {
+ case h: FileHistory =>
+ try {
+ h.flush()
+ } catch {
+ case e: IOException =>
+ logWarning("WARNING: Failed to write command history file: " + e.getMessage)
+ }
+ case _ =>
}
- }))
+ }
// TODO: missing
/*