aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2015-03-18 23:48:45 -0700
committerAaron Davidson <aaron@databricks.com>2015-03-18 23:48:45 -0700
commit2c3f83c34bb8d2c1bf13b33633d8c5a8089545d1 (patch)
treeb6f5e312b1b3b4c71c9016dc807272d784721c0a
parent645cf3fcc21987417b2946bdeeeb60af3edf667e (diff)
downloadspark-2c3f83c34bb8d2c1bf13b33633d8c5a8089545d1.tar.gz
spark-2c3f83c34bb8d2c1bf13b33633d8c5a8089545d1.tar.bz2
spark-2c3f83c34bb8d2c1bf13b33633d8c5a8089545d1.zip
[SPARK-4012] stop SparkContext when the exception is thrown from an infinite loop
https://issues.apache.org/jira/browse/SPARK-4012 This patch is a resubmission for https://github.com/apache/spark/pull/2864 What I am proposing in this patch is that ***when the exception is thrown from an infinite loop, we should stop the SparkContext, instead of let JVM throws exception forever*** So, in the infinite loops where we originally wrapped with a ` logUncaughtExceptions`, I changed to `tryOrStopSparkContext`, so that the Spark component is stopped Early stopped JVM process is helpful for HA scheme design, for example, The user has a script checking the existence of the pid of the Spark Streaming driver for monitoring the availability; with the code before this patch, the JVM process is still available but not functional when the exceptions are thrown andrewor14, srowen , mind taking further consideration about the change? Author: CodingCat <zhunansjtu@gmail.com> Closes #5004 from CodingCat/SPARK-4012-1 and squashes the following commits: 589276a [CodingCat] throw fatal error again 3c72cd8 [CodingCat] address the comments 6087864 [CodingCat] revise comments 6ad3eb0 [CodingCat] stop SparkContext instead of quit the JVM process 6322959 [CodingCat] exit JVM process when the exception is thrown from an infinite loop
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
9 files changed, 51 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 0c59a61e81..9b05c9623b 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
/** Keep cleaning RDD, shuffle, and broadcast state. */
- private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
+ private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4457f40286..228ff715fe 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- listenerBus.start()
+ listenerBus.start(this)
}
/** Post the application start event */
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 16d88c17d1..7fde020409 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def getRunner(operateFun: () => Unit): Runnable = {
new Runnable() {
- override def run() = Utils.logUncaughtExceptions {
+ override def run() = Utils.tryOrExit {
operateFun()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 7a9cf1c2e7..f33fd4450b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -145,7 +145,7 @@ private[spark] class TaskSchedulerImpl(
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
- Utils.tryOrExit { checkSpeculatableTasks() }
+ Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 18c627e8c7..ce7887b76f 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -21,6 +21,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import com.google.common.annotations.VisibleForTesting
+import org.apache.spark.SparkContext
/**
* Asynchronously passes events to registered listeners.
@@ -38,6 +39,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
self =>
+ private var sparkContext: SparkContext = null
+
/* Cap the capacity of the event queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
@@ -57,7 +60,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
private val listenerThread = new Thread(name) {
setDaemon(true)
- override def run(): Unit = Utils.logUncaughtExceptions {
+ override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
while (true) {
eventLock.acquire()
self.synchronized {
@@ -89,9 +92,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
+ *
+ * @param sc Used to stop the SparkContext in case the listener thread dies.
*/
- def start() {
+ def start(sc: SparkContext) {
if (started.compareAndSet(false, true)) {
+ sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index af8a24553a..91aa70870a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1146,6 +1146,8 @@ private[spark] object Utils extends Logging {
/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
+ *
+ * NOTE: This method is to be called by the spark-started JVM process.
*/
def tryOrExit(block: => Unit) {
try {
@@ -1157,6 +1159,32 @@ private[spark] object Utils extends Logging {
}
/**
+ * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
+ * exception
+ *
+ * NOTE: This method is to be called by the driver-side components to avoid stopping the
+ * user-started JVM process completely; in contrast, tryOrExit is to be called in the
+ * spark-started JVM process .
+ */
+ def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
+ try {
+ block
+ } catch {
+ case e: ControlThrowable => throw e
+ case t: Throwable =>
+ val currentThreadName = Thread.currentThread().getName
+ if (sc != null) {
+ logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t)
+ sc.stop()
+ }
+ if (!NonFatal(t)) {
+ logError(s"throw uncaught fatal error in thread $currentThreadName", t)
+ throw t
+ }
+ }
+ }
+
+ /**
* Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught
* exceptions as IOException. This is used when implementing Externalizable and Serializable's
* read and write methods, since Java's serializer will not report non-IOExceptions properly;
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 992dde66f9..448258a754 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -25,9 +25,9 @@ import scala.io.Source
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.{FunSuiteLike, BeforeAndAfter, FunSuite}
-import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
+import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io._
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -39,7 +39,8 @@ import org.apache.spark.util.{JsonProtocol, Utils}
* logging events, whether the parsing of the file names is correct, and whether the logged events
* can be read and deserialized into actual SparkListenerEvents.
*/
-class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging {
+class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter
+ with Logging {
import EventLoggingListenerSuite._
private val fileSystem = Utils.getHadoopFileSystem("/",
@@ -144,7 +145,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
- listenerBus.start()
+ listenerBus.start(sc)
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 3a41ee8d4a..627c9a4ddf 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -46,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
assert(counter.count === 0)
// Starting listener bus should flush all buffered events
- bus.start()
+ bus.start(sc)
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(counter.count === 5)
@@ -58,8 +58,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
// Listener bus must not be started twice
intercept[IllegalStateException] {
val bus = new LiveListenerBus
- bus.start()
- bus.start()
+ bus.start(sc)
+ bus.start(sc)
}
// ... or stopped before starting
@@ -96,7 +96,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
val blockingListener = new BlockingListener
bus.addListener(blockingListener)
- bus.start()
+ bus.start(sc)
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
@@ -347,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
- bus.start()
+ bus.start(sc)
// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index b3ffc71904..60bc099b27 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -61,7 +61,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
}), "JobScheduler")
- listenerBus.start()
+ listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()