aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-10-06 12:51:12 -0700
committerMichael Armbrust <michael@databricks.com>2016-10-06 12:51:12 -0700
commit9a48e60e6319d85f2c3be3a3c608dab135e18a73 (patch)
tree2f6081f4b8a0eab9696d7816932bc286832fa4ae
parent79accf45ace5549caa0cbab02f94fc87bedb5587 (diff)
downloadspark-9a48e60e6319d85f2c3be3a3c608dab135e18a73.tar.gz
spark-9a48e60e6319d85f2c3be3a3c608dab135e18a73.tar.bz2
spark-9a48e60e6319d85f2c3be3a3c608dab135e18a73.zip
[SPARK-17780][SQL] Report Throwable to user in StreamExecution
## What changes were proposed in this pull request? When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying. ## How was this patch tested? `test("NoClassDefFoundError from an incompatible source")` Author: Shixiong Zhu <shixiong@databricks.com> Closes #15352 from zsxwing/SPARK-17780.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala31
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala3
3 files changed, 37 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index b3a0d6ad0b..333239f875 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -207,13 +207,18 @@ class StreamExecution(
})
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
- case NonFatal(e) =>
+ case e: Throwable =>
streamDeathCause = new StreamingQueryException(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
Some(committedOffsets.toCompositeOffset(sources)))
logError(s"Query $name terminated with error", e)
+ // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
+ // handle them
+ if (!NonFatal(e)) {
+ throw e
+ }
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 1caafb9d74..cdbad901db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,10 +17,12 @@
package org.apache.spark.sql.streaming
+import scala.reflect.ClassTag
+import scala.util.control.ControlThrowable
+
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.StreamSourceProvider
-import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.ManualClock
@@ -236,6 +238,33 @@ class StreamSuite extends StreamTest {
}
}
+ testQuietly("fatal errors from a source should be sent to the user") {
+ for (e <- Seq(
+ new VirtualMachineError {},
+ new ThreadDeath,
+ new LinkageError,
+ new ControlThrowable {}
+ )) {
+ val source = new Source {
+ override def getOffset: Option[Offset] = {
+ throw e
+ }
+
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ throw e
+ }
+
+ override def schema: StructType = StructType(Array(StructField("value", IntegerType)))
+
+ override def stop(): Unit = {}
+ }
+ val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
+ testStream(df)(
+ ExpectFailure()(ClassTag(e.getClass))
+ )
+ }
+ }
+
test("output mode API in Scala") {
val o1 = OutputMode.Append
assert(o1 === InternalOutputModes.Append)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 09140a1d6e..fa13d385cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -167,7 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** Signals that a failure is expected and should not kill the test. */
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
- override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]"
+ override def toString(): String = s"ExpectFailure[${causeClass.getName}]"
}
/** Assert that a body is true */
@@ -322,7 +322,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
streamDeathCause = e
- testThread.interrupt()
}
})