From 2c3f83c34bb8d2c1bf13b33633d8c5a8089545d1 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 18 Mar 2015 23:48:45 -0700 Subject: [SPARK-4012] stop SparkContext when the exception is thrown from an infinite loop https://issues.apache.org/jira/browse/SPARK-4012 This patch is a resubmission for https://github.com/apache/spark/pull/2864 What I am proposing in this patch is that ***when the exception is thrown from an infinite loop, we should stop the SparkContext, instead of let JVM throws exception forever*** So, in the infinite loops where we originally wrapped with a ` logUncaughtExceptions`, I changed to `tryOrStopSparkContext`, so that the Spark component is stopped Early stopped JVM process is helpful for HA scheme design, for example, The user has a script checking the existence of the pid of the Spark Streaming driver for monitoring the availability; with the code before this patch, the JVM process is still available but not functional when the exceptions are thrown andrewor14, srowen , mind taking further consideration about the change? Author: CodingCat Closes #5004 from CodingCat/SPARK-4012-1 and squashes the following commits: 589276a [CodingCat] throw fatal error again 3c72cd8 [CodingCat] address the comments 6087864 [CodingCat] revise comments 6ad3eb0 [CodingCat] stop SparkContext instead of quit the JVM process 6322959 [CodingCat] exit JVM process when the exception is thrown from an infinite loop --- .../scala/org/apache/spark/ContextCleaner.scala | 2 +- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/deploy/history/FsHistoryProvider.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/util/AsynchronousListenerBus.scala | 10 ++++++-- .../main/scala/org/apache/spark/util/Utils.scala | 28 ++++++++++++++++++++++ .../scheduler/EventLoggingListenerSuite.scala | 9 +++---- .../spark/scheduler/SparkListenerSuite.scala | 10 ++++---- .../spark/streaming/scheduler/JobScheduler.scala | 2 +- 9 files changed, 51 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 0c59a61e81..9b05c9623b 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Keep cleaning RDD, shuffle, and broadcast state. */ - private def keepCleaning(): Unit = Utils.logUncaughtExceptions { + private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4457f40286..228ff715fe 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - listenerBus.start() + listenerBus.start(this) } /** Post the application start event */ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 16d88c17d1..7fde020409 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private def getRunner(operateFun: () => Unit): Runnable = { new Runnable() { - override def run() = Utils.logUncaughtExceptions { + override def run() = Utils.tryOrExit { operateFun() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7a9cf1c2e7..f33fd4450b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -145,7 +145,7 @@ private[spark] class TaskSchedulerImpl( import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { - Utils.tryOrExit { checkSpeculatableTasks() } + Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } } } } diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 18c627e8c7..ce7887b76f 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -21,6 +21,7 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean import com.google.common.annotations.VisibleForTesting +import org.apache.spark.SparkContext /** * Asynchronously passes events to registered listeners. @@ -38,6 +39,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri self => + private var sparkContext: SparkContext = null + /* Cap the capacity of the event queue so we get an explicit error (rather than * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ private val EVENT_QUEUE_CAPACITY = 10000 @@ -57,7 +60,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri private val listenerThread = new Thread(name) { setDaemon(true) - override def run(): Unit = Utils.logUncaughtExceptions { + override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { while (true) { eventLock.acquire() self.synchronized { @@ -89,9 +92,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri * This first sends out all buffered events posted before this listener bus has started, then * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. + * + * @param sc Used to stop the SparkContext in case the listener thread dies. */ - def start() { + def start(sc: SparkContext) { if (started.compareAndSet(false, true)) { + sparkContext = sc listenerThread.start() } else { throw new IllegalStateException(s"$name already started!") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index af8a24553a..91aa70870a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1146,6 +1146,8 @@ private[spark] object Utils extends Logging { /** * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the * default UncaughtExceptionHandler + * + * NOTE: This method is to be called by the spark-started JVM process. */ def tryOrExit(block: => Unit) { try { @@ -1156,6 +1158,32 @@ private[spark] object Utils extends Logging { } } + /** + * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught + * exception + * + * NOTE: This method is to be called by the driver-side components to avoid stopping the + * user-started JVM process completely; in contrast, tryOrExit is to be called in the + * spark-started JVM process . + */ + def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) { + try { + block + } catch { + case e: ControlThrowable => throw e + case t: Throwable => + val currentThreadName = Thread.currentThread().getName + if (sc != null) { + logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) + sc.stop() + } + if (!NonFatal(t)) { + logError(s"throw uncaught fatal error in thread $currentThreadName", t) + throw t + } + } + } + /** * Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught * exceptions as IOException. This is used when implementing Externalizable and Serializable's diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 992dde66f9..448258a754 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -25,9 +25,9 @@ import scala.io.Source import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{FunSuiteLike, BeforeAndAfter, FunSuite} -import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION} +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io._ import org.apache.spark.util.{JsonProtocol, Utils} @@ -39,7 +39,8 @@ import org.apache.spark.util.{JsonProtocol, Utils} * logging events, whether the parsing of the file names is correct, and whether the logged events * can be read and deserialized into actual SparkListenerEvents. */ -class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging { +class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter + with Logging { import EventLoggingListenerSuite._ private val fileSystem = Utils.getHadoopFileSystem("/", @@ -144,7 +145,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() - listenerBus.start() + listenerBus.start(sc) listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 3a41ee8d4a..627c9a4ddf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -46,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers assert(counter.count === 0) // Starting listener bus should flush all buffered events - bus.start() + bus.start(sc) assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(counter.count === 5) @@ -58,8 +58,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers // Listener bus must not be started twice intercept[IllegalStateException] { val bus = new LiveListenerBus - bus.start() - bus.start() + bus.start(sc) + bus.start(sc) } // ... or stopped before starting @@ -96,7 +96,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers val blockingListener = new BlockingListener bus.addListener(blockingListener) - bus.start() + bus.start(sc) bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() @@ -347,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers bus.addListener(badListener) bus.addListener(jobCounter1) bus.addListener(jobCounter2) - bus.start() + bus.start(sc) // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index b3ffc71904..60bc099b27 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -61,7 +61,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } }), "JobScheduler") - listenerBus.start() + listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) receiverTracker.start() jobGenerator.start() -- cgit v1.2.3