diff options
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6e77f354b5..70912d13ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.{InterruptedIOException, IOException} import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicReference @@ -37,6 +38,12 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} +/** States for [[StreamExecution]]'s lifecycle. */ +trait State +case object INITIALIZING extends State +case object ACTIVE extends State +case object TERMINATED extends State + /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any @@ -298,7 +305,14 @@ class StreamExecution( // `stop()` is already called. Let `finally` finish the cleanup. } } catch { - case _: InterruptedException if state.get == TERMINATED => // interrupted by stop() + case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED => + // interrupted by stop() + updateStatusMessage("Stopped") + case e: IOException if e.getMessage != null + && e.getMessage.startsWith(classOf[InterruptedException].getName) + && state.get == TERMINATED => + // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException` + // to `new IOException(ie.toString())` before Hadoop 2.8. updateStatusMessage("Stopped") case e: Throwable => streamDeathCause = new StreamingQueryException( @@ -721,10 +735,6 @@ class StreamExecution( } } - trait State - case object INITIALIZING extends State - case object ACTIVE extends State - case object TERMINATED extends State } |