aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala109
2 files changed, 119 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6e77f354b5..70912d13ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming
+import java.io.{InterruptedIOException, IOException}
import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
@@ -37,6 +38,12 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
+/** States for [[StreamExecution]]'s lifecycle. */
+trait State
+case object INITIALIZING extends State
+case object ACTIVE extends State
+case object TERMINATED extends State
+
/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
* Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
@@ -298,7 +305,14 @@ class StreamExecution(
// `stop()` is already called. Let `finally` finish the cleanup.
}
} catch {
- case _: InterruptedException if state.get == TERMINATED => // interrupted by stop()
+ case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED =>
+ // interrupted by stop()
+ updateStatusMessage("Stopped")
+ case e: IOException if e.getMessage != null
+ && e.getMessage.startsWith(classOf[InterruptedException].getName)
+ && state.get == TERMINATED =>
+ // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
+ // to `new IOException(ie.toString())` before Hadoop 2.8.
updateStatusMessage("Stopped")
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
@@ -721,10 +735,6 @@ class StreamExecution(
}
}
- trait State
- case object INITIALIZING extends State
- case object ACTIVE extends State
- case object TERMINATED extends State
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f44cfada29..6dfcd8baba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.streaming
+import java.io.{InterruptedIOException, IOException}
+import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
+
import scala.reflect.ClassTag
import scala.util.control.ControlThrowable
@@ -350,13 +353,45 @@ class StreamSuite extends StreamTest {
}
}
}
-}
-/**
- * A fake StreamSourceProvider thats creates a fake Source that cannot be reused.
- */
-class FakeDefaultSource extends StreamSourceProvider {
+ test("handle IOException when the streaming thread is interrupted (pre Hadoop 2.8)") {
+ // This test uses a fake source to throw the same IOException as pre Hadoop 2.8 when the
+ // streaming thread is interrupted. We should handle it properly by not failing the query.
+ ThrowingIOExceptionLikeHadoop12074.createSourceLatch = new CountDownLatch(1)
+ val query = spark
+ .readStream
+ .format(classOf[ThrowingIOExceptionLikeHadoop12074].getName)
+ .load()
+ .writeStream
+ .format("console")
+ .start()
+ assert(ThrowingIOExceptionLikeHadoop12074.createSourceLatch
+ .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
+ "ThrowingIOExceptionLikeHadoop12074.createSource wasn't called before timeout")
+ query.stop()
+ assert(query.exception.isEmpty)
+ }
+ test("handle InterruptedIOException when the streaming thread is interrupted (Hadoop 2.8+)") {
+ // This test uses a fake source to throw the same InterruptedIOException as Hadoop 2.8+ when the
+ // streaming thread is interrupted. We should handle it properly by not failing the query.
+ ThrowingInterruptedIOException.createSourceLatch = new CountDownLatch(1)
+ val query = spark
+ .readStream
+ .format(classOf[ThrowingInterruptedIOException].getName)
+ .load()
+ .writeStream
+ .format("console")
+ .start()
+ assert(ThrowingInterruptedIOException.createSourceLatch
+ .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
+ "ThrowingInterruptedIOException.createSource wasn't called before timeout")
+ query.stop()
+ assert(query.exception.isEmpty)
+ }
+}
+
+abstract class FakeSource extends StreamSourceProvider {
private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
override def sourceSchema(
@@ -364,6 +399,10 @@ class FakeDefaultSource extends StreamSourceProvider {
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)
+}
+
+/** A fake StreamSourceProvider that creates a fake Source that cannot be reused. */
+class FakeDefaultSource extends FakeSource {
override def createSource(
spark: SQLContext,
@@ -395,3 +434,63 @@ class FakeDefaultSource extends StreamSourceProvider {
}
}
}
+
+/** A fake source that throws the same IOException like pre Hadoop 2.8 when it's interrupted. */
+class ThrowingIOExceptionLikeHadoop12074 extends FakeSource {
+ import ThrowingIOExceptionLikeHadoop12074._
+
+ override def createSource(
+ spark: SQLContext,
+ metadataPath: String,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): Source = {
+ createSourceLatch.countDown()
+ try {
+ Thread.sleep(30000)
+ throw new TimeoutException("sleep was not interrupted in 30 seconds")
+ } catch {
+ case ie: InterruptedException =>
+ throw new IOException(ie.toString)
+ }
+ }
+}
+
+object ThrowingIOExceptionLikeHadoop12074 {
+ /**
+ * A latch to allow the user to wait until [[ThrowingIOExceptionLikeHadoop12074.createSource]] is
+ * called.
+ */
+ @volatile var createSourceLatch: CountDownLatch = null
+}
+
+/** A fake source that throws InterruptedIOException like Hadoop 2.8+ when it's interrupted. */
+class ThrowingInterruptedIOException extends FakeSource {
+ import ThrowingInterruptedIOException._
+
+ override def createSource(
+ spark: SQLContext,
+ metadataPath: String,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): Source = {
+ createSourceLatch.countDown()
+ try {
+ Thread.sleep(30000)
+ throw new TimeoutException("sleep was not interrupted in 30 seconds")
+ } catch {
+ case ie: InterruptedException =>
+ val iie = new InterruptedIOException(ie.toString)
+ iie.initCause(ie)
+ throw iie
+ }
+ }
+}
+
+object ThrowingInterruptedIOException {
+ /**
+ * A latch to allow the user to wait until [[ThrowingInterruptedIOException.createSource]] is
+ * called.
+ */
+ @volatile var createSourceLatch: CountDownLatch = null
+}