From b554b3c46b0019a6caf0f9a975b460dc2570c3b2 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 25 Mar 2016 13:28:26 -0700 Subject: [SPARK-14131][SQL] Add a workaround for HADOOP-10622 to fix DataFrameReaderWriterSuite ## What changes were proposed in this pull request? There is a potential dead-lock in Hadoop Shell.runCommand before 2.5.0 ([HADOOP-10622](https://issues.apache.org/jira/browse/HADOOP-10622)). If we interrupt some thread running Shell.runCommand, we may hit this issue. This PR adds some protecion to prevent from interrupting the microBatchThread when we may run into Shell.runCommand. There are two places will call Shell.runCommand now: - offsetLog.add - FileStreamSource.getOffset They will create a file using HDFS API and call Shell.runCommand to set the file permission. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu Closes #11940 from zsxwing/workaround-for-HADOOP-10622. --- .../sql/execution/streaming/StreamExecution.scala | 85 ++++++++++++++++++++-- 1 file changed, 80 insertions(+), 5 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5abd7eca2c..60e00d203c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -101,6 +102,65 @@ class StreamExecution( private val offsetLog = new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets")) + /** A monitor to protect "uninterruptible" and "interrupted" */ + private val uninterruptibleLock = new Object + + /** + * Indicates if "microBatchThread" are in the uninterruptible status. If so, interrupting + * "microBatchThread" will be deferred until "microBatchThread" enters into the interruptible + * status. + */ + @GuardedBy("uninterruptibleLock") + private var uninterruptible = false + + /** + * Indicates if we should interrupt "microBatchThread" when we are leaving the uninterruptible + * zone. + */ + @GuardedBy("uninterruptibleLock") + private var shouldInterruptThread = false + + /** + * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible + * status, "microBatchThread" won't be interrupted until it enters into the interruptible status. + */ + private def interruptMicroBatchThreadSafely(): Unit = { + uninterruptibleLock.synchronized { + if (uninterruptible) { + shouldInterruptThread = true + } else { + microBatchThread.interrupt() + } + } + } + + /** + * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" won't be interrupted before + * returning from `f`. + */ + private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = { + assert(Thread.currentThread() == microBatchThread) + uninterruptibleLock.synchronized { + uninterruptible = true + // Clear the interrupted status if it's set. + if (Thread.interrupted()) { + shouldInterruptThread = true + } + } + try { + f + } finally { + uninterruptibleLock.synchronized { + uninterruptible = false + if (shouldInterruptThread) { + // Recover the interrupted status + microBatchThread.interrupt() + shouldInterruptThread = false + } + } + } + } + /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE @@ -227,14 +287,29 @@ class StreamExecution( // Update committed offsets. committedOffsets ++= availableOffsets + // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). + // If we interrupt some thread running Shell.runCommand, we may hit this issue. + // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" + // to set the file permission, we should not interrupt "microBatchThread" when running this + // method. See SPARK-14131. + // // Check to see what new data is available. - val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) + val newData = runUninterruptiblyInMicroBatchThread { + uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) + } availableOffsets ++= newData if (dataAvailable) { - assert( - offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), - s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") + // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). + // If we interrupt some thread running Shell.runCommand, we may hit this issue. + // As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set + // the file permission, we should not interrupt "microBatchThread" when running this method. + // See SPARK-14131. + runUninterruptiblyInMicroBatchThread { + assert( + offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), + s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") + } currentBatchId += 1 logInfo(s"Committed offsets for batch $currentBatchId.") true @@ -320,7 +395,7 @@ class StreamExecution( // intentionally state = TERMINATED if (microBatchThread.isAlive) { - microBatchThread.interrupt() + interruptMicroBatchThreadSafely() microBatchThread.join() } logInfo(s"Query $name was stopped") -- cgit v1.2.3