aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMikael Ståldal <mikael.staldal@magine.com>2016-07-24 11:16:24 +0100
committerSean Owen <sowen@cloudera.com>2016-07-24 11:16:24 +0100
commit23e047f4609bf39f50ea4c65f704cac15408a821 (patch)
tree52810e3e06add116cd37b37191d34a65000573db
parent37bed97de5f81a9127f1ff12db695aab6e5c4c47 (diff)
downloadspark-23e047f4609bf39f50ea4c65f704cac15408a821.tar.gz
spark-23e047f4609bf39f50ea4c65f704cac15408a821.tar.bz2
spark-23e047f4609bf39f50ea4c65f704cac15408a821.zip
[SPARK-16416][CORE] force eager creation of loggers to avoid shutdown hook conflicts
## What changes were proposed in this pull request? Force eager creation of loggers to avoid shutdown hook conflicts. ## How was this patch tested? Manually tested with a project using Log4j 2, verified that the shutdown hook conflict issue was solved. Author: Mikael Ståldal <mikael.staldal@magine.com> Closes #14320 from mikaelstaldal/shutdown-hook-logging.
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala1
8 files changed, 10 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 6bd950205f..486d535da0 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -46,6 +46,8 @@ private[spark] class MapOutputTrackerMasterEndpoint(
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
extends RpcEndpoint with Logging {
+ logDebug("init") // force eager creation of logger
+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 57d1f09f6b..6d7f05d217 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -556,6 +556,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
+ logDebug("Adding shutdown hook") // force eager creation of logger
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 37a19a495b..13eadbe44f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -128,6 +128,7 @@ object ExternalShuffleService extends Logging {
server = newShuffleService(sparkConf, securityManager)
server.start()
+ logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook { () =>
logInfo("Shutting down shuffle service.")
server.stop()
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index a057977eb0..73b6ca3844 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -104,6 +104,7 @@ private[mesos] object MesosClusterDispatcher extends Logging {
}
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
dispatcher.start()
+ logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook { () =>
logInfo("Shutdown hook is shutting down dispatcher")
dispatcher.stop()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 2dd453cd63..7bed6851d0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -184,6 +184,8 @@ private[spark] object OutputCommitCoordinator {
override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
extends RpcEndpoint with Logging {
+ logDebug("init") // force eager creation of logger
+
override def receive: PartialFunction[Any, Unit] = {
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 0666be2dcb..3d43e3c367 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -141,6 +141,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
}
private def addShutdownHook(): AnyRef = {
+ logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
logInfo("Shutdown hook called")
DiskBlockManager.this.doStop()
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 93ac67e5db..4001fac3c3 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -54,6 +54,7 @@ private[spark] object ShutdownHookManager extends Logging {
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
// Add a shutdown hook to delete the temp dirs when the JVM exits
+ logDebug("Adding shutdown hook") // force eager creation of logger
addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
logInfo("Shutdown hook called")
// we need to materialize the paths to delete because deleteRecursively removes items from
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6046426fdf..4808d0fcbc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -592,6 +592,7 @@ class StreamingContext private[streaming] (
}
StreamingContext.setActiveContext(this)
}
+ logDebug("Adding shutdown hook") // force eager creation of logger
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext