aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/SignalLogger.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/SignalUtils.scala (renamed from core/src/main/scala/org/apache/spark/util/Signaling.scala)91
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/Signaling.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
5 files changed, 58 insertions, 77 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SignalLogger.scala b/core/src/main/scala/org/apache/spark/util/SignalLogger.scala
deleted file mode 100644
index a793c9135e..0000000000
--- a/core/src/main/scala/org/apache/spark/util/SignalLogger.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import org.slf4j.Logger
-
-/**
- * Used to log signals received. This can be very useful in debugging crashes or kills.
- */
-private[spark] object SignalLogger {
-
- private var registered = false
-
- /** Register a signal handler to log signals on UNIX-like systems. */
- def register(log: Logger): Unit = Seq("TERM", "HUP", "INT").foreach{ sig =>
- Signaling.register(sig) {
- log.error("RECEIVED SIGNAL " + sig)
- false
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/util/Signaling.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
index 2075cc45a9..9479d8f74d 100644
--- a/core/src/main/scala/org/apache/spark/util/Signaling.scala
+++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
@@ -17,28 +17,69 @@
package org.apache.spark.util
-import java.util.{Collections, LinkedList}
+import java.util.Collections
import scala.collection.JavaConverters._
-import scala.collection.mutable.HashMap
import org.apache.commons.lang3.SystemUtils
+import org.slf4j.Logger
import sun.misc.{Signal, SignalHandler}
import org.apache.spark.internal.Logging
-
/**
* Contains utilities for working with posix signals.
*/
-private[spark] object Signaling extends Logging {
+private[spark] object SignalUtils extends Logging {
+
+ /** A flag to make sure we only register the logger once. */
+ private var loggerRegistered = false
+
+ /** Register a signal handler to log signals on UNIX-like systems. */
+ def registerLogger(log: Logger): Unit = synchronized {
+ if (!loggerRegistered) {
+ Seq("TERM", "HUP", "INT").foreach { sig =>
+ SignalUtils.register(sig) {
+ log.error("RECEIVED SIGNAL " + sig)
+ false
+ }
+ }
+ loggerRegistered = true
+ }
+ }
+
+ /**
+ * Adds an action to be run when a given signal is received by this process.
+ *
+ * Note that signals are only supported on unix-like operating systems and work on a best-effort
+ * basis: if a signal is not available or cannot be intercepted, only a warning is emitted.
+ *
+ * All actions for a given signal are run in a separate thread.
+ */
+ def register(signal: String)(action: => Boolean): Unit = synchronized {
+ if (SystemUtils.IS_OS_UNIX) {
+ try {
+ val handler = handlers.getOrElseUpdate(signal, {
+ logInfo("Registered signal handler for " + signal)
+ new ActionHandler(new Signal(signal))
+ })
+ handler.register(action)
+ } catch {
+ case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex)
+ }
+ }
+ }
/**
* A handler for the given signal that runs a collection of actions.
*/
private class ActionHandler(signal: Signal) extends SignalHandler {
- private val actions = Collections.synchronizedList(new LinkedList[() => Boolean])
+ /**
+ * List of actions upon the signal; the callbacks should return true if the signal is "handled",
+ * i.e. should not escalate to the next callback.
+ */
+ private val actions = Collections.synchronizedList(new java.util.LinkedList[() => Boolean])
// original signal handler, before this handler was attached
private val prevHandler: SignalHandler = Signal.handle(signal, this)
@@ -51,11 +92,10 @@ private[spark] object Signaling extends Logging {
// register old handler, will receive incoming signals while this handler is running
Signal.handle(signal, prevHandler)
- val escalate = actions.asScala forall { action =>
- !action()
- }
-
- if(escalate) {
+ // run all actions, escalate to parent handler if no action catches the signal
+ // (i.e. all actions return false)
+ val escalate = actions.asScala.forall { action => !action() }
+ if (escalate) {
prevHandler.handle(sig)
}
@@ -64,36 +104,13 @@ private[spark] object Signaling extends Logging {
}
/**
- * Add an action to be run by this handler.
+ * Adds an action to be run by this handler.
* @param action An action to be run when a signal is received. Return true if the signal
- * should be stopped with this handler, false if it should be escalated.
+ * should be stopped with this handler, false if it should be escalated.
*/
def register(action: => Boolean): Unit = actions.add(() => action)
-
- }
-
- // contains association of signals to their respective handlers
- private val handlers = new HashMap[String, ActionHandler]
-
- /**
- * Adds an action to be run when a given signal is received by this process.
- *
- * Note that signals are only supported on unix-like operating systems and work on a best-effort
- * basis: if a signal is not available or cannot be intercepted, only a warning is emitted.
- *
- * All actions for a given signal are run in a separate thread.
- */
- def register(signal: String)(action: => Boolean): Unit = synchronized {
- if (SystemUtils.IS_OS_UNIX) try {
- val handler = handlers.getOrElseUpdate(signal, {
- val h = new ActionHandler(new Signal(signal))
- logInfo("Registered signal handler for " + signal)
- h
- })
- handler.register(action)
- } catch {
- case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex)
- }
}
+ /** Mapping from signal to their respective handlers. */
+ private val handlers = new scala.collection.mutable.HashMap[String, ActionHandler]
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 848f7d7adb..ea49991493 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2284,7 +2284,7 @@ private[spark] object Utils extends Logging {
*/
def initDaemon(log: Logger): Unit = {
log.info(s"Started daemon with process name: ${Utils.getProcessName()}")
- SignalLogger.register(log)
+ SignalUtils.registerLogger(log)
}
}
diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
index c305ed545c..202febf144 100644
--- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
@@ -19,7 +19,7 @@ package org.apache.spark.repl
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{Signaling => USignaling}
+import org.apache.spark.util.SignalUtils
private[repl] object Signaling extends Logging {
@@ -28,7 +28,7 @@ private[repl] object Signaling extends Logging {
* when no jobs are currently running.
* This makes it possible to interrupt a running shell job by pressing Ctrl+C.
*/
- def cancelOnInterrupt(ctx: SparkContext): Unit = USignaling.register("INT") {
+ def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") {
if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
logWarning("Cancelling all active jobs, this can take a while. " +
"Press Ctrl+C again to exit now.")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 5bb63500c8..4df90d7b6b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -716,7 +716,7 @@ object ApplicationMaster extends Logging {
private var master: ApplicationMaster = _
def main(args: Array[String]): Unit = {
- SignalLogger.register(log)
+ SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)