aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala20
1 files changed, 15 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6e77f354b5..70912d13ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming
+import java.io.{InterruptedIOException, IOException}
import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
@@ -37,6 +38,12 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
+/** States for [[StreamExecution]]'s lifecycle. */
+trait State
+case object INITIALIZING extends State
+case object ACTIVE extends State
+case object TERMINATED extends State
+
/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
* Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
@@ -298,7 +305,14 @@ class StreamExecution(
// `stop()` is already called. Let `finally` finish the cleanup.
}
} catch {
- case _: InterruptedException if state.get == TERMINATED => // interrupted by stop()
+ case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED =>
+ // interrupted by stop()
+ updateStatusMessage("Stopped")
+ case e: IOException if e.getMessage != null
+ && e.getMessage.startsWith(classOf[InterruptedException].getName)
+ && state.get == TERMINATED =>
+ // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
+ // to `new IOException(ie.toString())` before Hadoop 2.8.
updateStatusMessage("Stopped")
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
@@ -721,10 +735,6 @@ class StreamExecution(
}
}
- trait State
- case object INITIALIZING extends State
- case object ACTIVE extends State
- case object TERMINATED extends State
}