aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-01-10 14:24:45 +0000
committerSean Owen <sowen@cloudera.com>2017-01-10 14:24:45 +0000
commit3ef183a941d45b2f7ad167ea5133a93de0da5176 (patch)
tree2128d911d0e0787e8b1c78f229540298abb1037e /sql/core
parenta2c6adcc5d2702d2f0e9b239517353335e5f911e (diff)
downloadspark-3ef183a941d45b2f7ad167ea5133a93de0da5176.tar.gz
spark-3ef183a941d45b2f7ad167ea5133a93de0da5176.tar.bz2
spark-3ef183a941d45b2f7ad167ea5133a93de0da5176.zip
[SPARK-19113][SS][TESTS] Set UncaughtExceptionHandler in onQueryStarted to ensure catching fatal errors during query initialization
## What changes were proposed in this pull request? StreamTest sets `UncaughtExceptionHandler` after starting the query now. It may not be able to catch fatal errors during query initialization. This PR uses `onQueryStarted` callback to fix it. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16492 from zsxwing/SPARK-19113.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala28
2 files changed, 26 insertions, 9 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 34b0ee8064..e964e646d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -238,7 +238,7 @@ class StreamSuite extends StreamTest {
}
}
- testQuietly("fatal errors from a source should be sent to the user") {
+ testQuietly("handle fatal errors thrown from the stream thread") {
for (e <- Seq(
new VirtualMachineError {},
new ThreadDeath,
@@ -259,8 +259,11 @@ class StreamSuite extends StreamTest {
override def stop(): Unit = {}
}
val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
- // These error are fatal errors and should be ignored in `testStream` to not fail the test.
testStream(df)(
+ // `ExpectFailure(isFatalError = true)` verifies two things:
+ // - Fatal errors can be propagated to `StreamingQuery.exception` and
+ // `StreamingQuery.awaitTermination` like non fatal errors.
+ // - Fatal errors can be caught by UncaughtExceptionHandler.
ExpectFailure(isFatalError = true)(ClassTag(e.getClass))
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 709050d29b..4aa4100522 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -235,7 +235,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
*/
def testStream(
_stream: Dataset[_],
- outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = {
+ outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = synchronized {
+ // `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently
+ // because this method assumes there is only one active query in its `StreamingQueryListener`
+ // and it may not work correctly when multiple `testStream`s run concurrently.
val stream = _stream.toDF()
val sparkSession = stream.sparkSession // use the session in DF, not the default session
@@ -248,6 +251,22 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
@volatile
var streamThreadDeathCause: Throwable = null
+ // Set UncaughtExceptionHandler in `onQueryStarted` so that we can ensure catching fatal errors
+ // during query initialization.
+ val listener = new StreamingQueryListener {
+ override def onQueryStarted(event: QueryStartedEvent): Unit = {
+ // Note: this assumes there is only one query active in the `testStream` method.
+ Thread.currentThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
+ override def uncaughtException(t: Thread, e: Throwable): Unit = {
+ streamThreadDeathCause = e
+ }
+ })
+ }
+
+ override def onQueryProgress(event: QueryProgressEvent): Unit = {}
+ override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
+ }
+ sparkSession.streams.addListener(listener)
// If the test doesn't manually start the stream, we do it automatically at the beginning.
val startedManually =
@@ -364,12 +383,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
triggerClock = triggerClock)
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
- currentStream.microBatchThread.setUncaughtExceptionHandler(
- new UncaughtExceptionHandler {
- override def uncaughtException(t: Thread, e: Throwable): Unit = {
- streamThreadDeathCause = e
- }
- })
// Wait until the initialization finishes, because some tests need to use `logicalPlan`
// after starting the query.
currentStream.awaitInitialization(streamingTimeout.toMillis)
@@ -545,6 +558,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case (key, Some(value)) => sparkSession.conf.set(key, value)
case (key, None) => sparkSession.conf.unset(key)
}
+ sparkSession.streams.removeListener(listener)
}
}