aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-25 13:28:26 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-03-25 13:28:26 -0700
commitb554b3c46b0019a6caf0f9a975b460dc2570c3b2 (patch)
tree7de1a82630527694e73a9967b0f9b1def389ec0e
parentafd0debe075e9ea8466e384932a513ef0188273c (diff)
downloadspark-b554b3c46b0019a6caf0f9a975b460dc2570c3b2.tar.gz
spark-b554b3c46b0019a6caf0f9a975b460dc2570c3b2.tar.bz2
spark-b554b3c46b0019a6caf0f9a975b460dc2570c3b2.zip
[SPARK-14131][SQL] Add a workaround for HADOOP-10622 to fix DataFrameReaderWriterSuite
## What changes were proposed in this pull request? There is a potential dead-lock in Hadoop Shell.runCommand before 2.5.0 ([HADOOP-10622](https://issues.apache.org/jira/browse/HADOOP-10622)). If we interrupt some thread running Shell.runCommand, we may hit this issue. This PR adds some protecion to prevent from interrupting the microBatchThread when we may run into Shell.runCommand. There are two places will call Shell.runCommand now: - offsetLog.add - FileStreamSource.getOffset They will create a file using HDFS API and call Shell.runCommand to set the file permission. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11940 from zsxwing/workaround-for-HADOOP-10622.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala85
1 files changed, 80 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5abd7eca2c..60e00d203c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
@@ -101,6 +102,65 @@ class StreamExecution(
private val offsetLog =
new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets"))
+ /** A monitor to protect "uninterruptible" and "interrupted" */
+ private val uninterruptibleLock = new Object
+
+ /**
+ * Indicates if "microBatchThread" are in the uninterruptible status. If so, interrupting
+ * "microBatchThread" will be deferred until "microBatchThread" enters into the interruptible
+ * status.
+ */
+ @GuardedBy("uninterruptibleLock")
+ private var uninterruptible = false
+
+ /**
+ * Indicates if we should interrupt "microBatchThread" when we are leaving the uninterruptible
+ * zone.
+ */
+ @GuardedBy("uninterruptibleLock")
+ private var shouldInterruptThread = false
+
+ /**
+ * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible
+ * status, "microBatchThread" won't be interrupted until it enters into the interruptible status.
+ */
+ private def interruptMicroBatchThreadSafely(): Unit = {
+ uninterruptibleLock.synchronized {
+ if (uninterruptible) {
+ shouldInterruptThread = true
+ } else {
+ microBatchThread.interrupt()
+ }
+ }
+ }
+
+ /**
+ * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" won't be interrupted before
+ * returning from `f`.
+ */
+ private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = {
+ assert(Thread.currentThread() == microBatchThread)
+ uninterruptibleLock.synchronized {
+ uninterruptible = true
+ // Clear the interrupted status if it's set.
+ if (Thread.interrupted()) {
+ shouldInterruptThread = true
+ }
+ }
+ try {
+ f
+ } finally {
+ uninterruptibleLock.synchronized {
+ uninterruptible = false
+ if (shouldInterruptThread) {
+ // Recover the interrupted status
+ microBatchThread.interrupt()
+ shouldInterruptThread = false
+ }
+ }
+ }
+ }
+
/** Whether the query is currently active or not */
override def isActive: Boolean = state == ACTIVE
@@ -227,14 +287,29 @@ class StreamExecution(
// Update committed offsets.
committedOffsets ++= availableOffsets
+ // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
+ // If we interrupt some thread running Shell.runCommand, we may hit this issue.
+ // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand"
+ // to set the file permission, we should not interrupt "microBatchThread" when running this
+ // method. See SPARK-14131.
+ //
// Check to see what new data is available.
- val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+ val newData = runUninterruptiblyInMicroBatchThread {
+ uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+ }
availableOffsets ++= newData
if (dataAvailable) {
- assert(
- offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
- s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
+ // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
+ // If we interrupt some thread running Shell.runCommand, we may hit this issue.
+ // As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set
+ // the file permission, we should not interrupt "microBatchThread" when running this method.
+ // See SPARK-14131.
+ runUninterruptiblyInMicroBatchThread {
+ assert(
+ offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+ s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
+ }
currentBatchId += 1
logInfo(s"Committed offsets for batch $currentBatchId.")
true
@@ -320,7 +395,7 @@ class StreamExecution(
// intentionally
state = TERMINATED
if (microBatchThread.isAlive) {
- microBatchThread.interrupt()
+ interruptMicroBatchThreadSafely()
microBatchThread.join()
}
logInfo(s"Query $name was stopped")