From a6a7a95e2f3482d84fcd744713e43f80ea90e33a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 3 Mar 2017 17:10:11 -0800 Subject: [SPARK-19718][SS] Handle more interrupt cases properly for Hadoop ## What changes were proposed in this pull request? [SPARK-19617](https://issues.apache.org/jira/browse/SPARK-19617) changed `HDFSMetadataLog` to enable interrupts when using the local file system. However, now we hit [HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074): `Shell.runCommand` converts `InterruptedException` to `new IOException(ie.toString())` before Hadoop 2.8. This is the Hadoop patch to fix HADOOP-1207: https://github.com/apache/hadoop/commit/95c73d49b1bb459b626a9ac52acadb8f5fa724de This PR adds new logic to handle the following cases related to `InterruptedException`. - Check if the message of IOException starts with `java.lang.InterruptedException`. If so, treat it as `InterruptedException`. This is for pre-Hadoop 2.8. - Treat `InterruptedIOException` as `InterruptedException`. This is for Hadoop 2.8+ and other places that may throw `InterruptedIOException` when the thread is interrupted. ## How was this patch tested? The new unit test. Author: Shixiong Zhu Closes #17044 from zsxwing/SPARK-19718. --- .../sql/execution/streaming/StreamExecution.scala | 20 +++- .../apache/spark/sql/streaming/StreamSuite.scala | 109 ++++++++++++++++++++- 2 files changed, 119 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6e77f354b5..70912d13ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.{InterruptedIOException, IOException} import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicReference @@ -37,6 +38,12 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} +/** States for [[StreamExecution]]'s lifecycle. */ +trait State +case object INITIALIZING extends State +case object ACTIVE extends State +case object TERMINATED extends State + /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any @@ -298,7 +305,14 @@ class StreamExecution( // `stop()` is already called. Let `finally` finish the cleanup. } } catch { - case _: InterruptedException if state.get == TERMINATED => // interrupted by stop() + case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED => + // interrupted by stop() + updateStatusMessage("Stopped") + case e: IOException if e.getMessage != null + && e.getMessage.startsWith(classOf[InterruptedException].getName) + && state.get == TERMINATED => + // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException` + // to `new IOException(ie.toString())` before Hadoop 2.8. updateStatusMessage("Stopped") case e: Throwable => streamDeathCause = new StreamingQueryException( @@ -721,10 +735,6 @@ class StreamExecution( } } - trait State - case object INITIALIZING extends State - case object ACTIVE extends State - case object TERMINATED extends State } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index f44cfada29..6dfcd8baba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.streaming +import java.io.{InterruptedIOException, IOException} +import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} + import scala.reflect.ClassTag import scala.util.control.ControlThrowable @@ -350,13 +353,45 @@ class StreamSuite extends StreamTest { } } } -} -/** - * A fake StreamSourceProvider thats creates a fake Source that cannot be reused. - */ -class FakeDefaultSource extends StreamSourceProvider { + test("handle IOException when the streaming thread is interrupted (pre Hadoop 2.8)") { + // This test uses a fake source to throw the same IOException as pre Hadoop 2.8 when the + // streaming thread is interrupted. We should handle it properly by not failing the query. + ThrowingIOExceptionLikeHadoop12074.createSourceLatch = new CountDownLatch(1) + val query = spark + .readStream + .format(classOf[ThrowingIOExceptionLikeHadoop12074].getName) + .load() + .writeStream + .format("console") + .start() + assert(ThrowingIOExceptionLikeHadoop12074.createSourceLatch + .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS), + "ThrowingIOExceptionLikeHadoop12074.createSource wasn't called before timeout") + query.stop() + assert(query.exception.isEmpty) + } + test("handle InterruptedIOException when the streaming thread is interrupted (Hadoop 2.8+)") { + // This test uses a fake source to throw the same InterruptedIOException as Hadoop 2.8+ when the + // streaming thread is interrupted. We should handle it properly by not failing the query. + ThrowingInterruptedIOException.createSourceLatch = new CountDownLatch(1) + val query = spark + .readStream + .format(classOf[ThrowingInterruptedIOException].getName) + .load() + .writeStream + .format("console") + .start() + assert(ThrowingInterruptedIOException.createSourceLatch + .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS), + "ThrowingInterruptedIOException.createSource wasn't called before timeout") + query.stop() + assert(query.exception.isEmpty) + } +} + +abstract class FakeSource extends StreamSourceProvider { private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) override def sourceSchema( @@ -364,6 +399,10 @@ class FakeDefaultSource extends StreamSourceProvider { schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema) +} + +/** A fake StreamSourceProvider that creates a fake Source that cannot be reused. */ +class FakeDefaultSource extends FakeSource { override def createSource( spark: SQLContext, @@ -395,3 +434,63 @@ class FakeDefaultSource extends StreamSourceProvider { } } } + +/** A fake source that throws the same IOException like pre Hadoop 2.8 when it's interrupted. */ +class ThrowingIOExceptionLikeHadoop12074 extends FakeSource { + import ThrowingIOExceptionLikeHadoop12074._ + + override def createSource( + spark: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + createSourceLatch.countDown() + try { + Thread.sleep(30000) + throw new TimeoutException("sleep was not interrupted in 30 seconds") + } catch { + case ie: InterruptedException => + throw new IOException(ie.toString) + } + } +} + +object ThrowingIOExceptionLikeHadoop12074 { + /** + * A latch to allow the user to wait until [[ThrowingIOExceptionLikeHadoop12074.createSource]] is + * called. + */ + @volatile var createSourceLatch: CountDownLatch = null +} + +/** A fake source that throws InterruptedIOException like Hadoop 2.8+ when it's interrupted. */ +class ThrowingInterruptedIOException extends FakeSource { + import ThrowingInterruptedIOException._ + + override def createSource( + spark: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + createSourceLatch.countDown() + try { + Thread.sleep(30000) + throw new TimeoutException("sleep was not interrupted in 30 seconds") + } catch { + case ie: InterruptedException => + val iie = new InterruptedIOException(ie.toString) + iie.initCause(ie) + throw iie + } + } +} + +object ThrowingInterruptedIOException { + /** + * A latch to allow the user to wait until [[ThrowingInterruptedIOException.createSource]] is + * called. + */ + @volatile var createSourceLatch: CountDownLatch = null +} -- cgit v1.2.3