aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-28 16:29:11 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-03-28 16:29:11 -0700
commit2f98ee67dff0be38a4c92d7d29c8cc8ea8b6576e (patch)
treedd5b2176d1334c6d5e73d3ec5f85018a51802d1e /sql
parentb7836492bb0b5b430539d2bfa20bcc32e3fe3504 (diff)
downloadspark-2f98ee67dff0be38a4c92d7d29c8cc8ea8b6576e.tar.gz
spark-2f98ee67dff0be38a4c92d7d29c8cc8ea8b6576e.tar.bz2
spark-2f98ee67dff0be38a4c92d7d29c8cc8ea8b6576e.zip
[SPARK-14169][CORE] Add UninterruptibleThread
## What changes were proposed in this pull request? Extract the workaround for HADOOP-10622 introduced by #11940 into UninterruptibleThread so that we can test and reuse it. ## How was this patch tested? Unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11971 from zsxwing/uninterrupt.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala74
1 files changed, 8 insertions, 66 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 60e00d203c..c4e410d92c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
-import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
@@ -34,6 +33,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.ContinuousQueryListener._
+import org.apache.spark.util.UninterruptibleThread
/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
@@ -89,9 +89,10 @@ class StreamExecution(
private[sql] var streamDeathCause: ContinuousQueryException = null
/** The thread that runs the micro-batches of this stream. */
- private[sql] val microBatchThread = new Thread(s"stream execution thread for $name") {
- override def run(): Unit = { runBatches() }
- }
+ private[sql] val microBatchThread =
+ new UninterruptibleThread(s"stream execution thread for $name") {
+ override def run(): Unit = { runBatches() }
+ }
/**
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
@@ -102,65 +103,6 @@ class StreamExecution(
private val offsetLog =
new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets"))
- /** A monitor to protect "uninterruptible" and "interrupted" */
- private val uninterruptibleLock = new Object
-
- /**
- * Indicates if "microBatchThread" are in the uninterruptible status. If so, interrupting
- * "microBatchThread" will be deferred until "microBatchThread" enters into the interruptible
- * status.
- */
- @GuardedBy("uninterruptibleLock")
- private var uninterruptible = false
-
- /**
- * Indicates if we should interrupt "microBatchThread" when we are leaving the uninterruptible
- * zone.
- */
- @GuardedBy("uninterruptibleLock")
- private var shouldInterruptThread = false
-
- /**
- * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible
- * status, "microBatchThread" won't be interrupted until it enters into the interruptible status.
- */
- private def interruptMicroBatchThreadSafely(): Unit = {
- uninterruptibleLock.synchronized {
- if (uninterruptible) {
- shouldInterruptThread = true
- } else {
- microBatchThread.interrupt()
- }
- }
- }
-
- /**
- * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" won't be interrupted before
- * returning from `f`.
- */
- private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = {
- assert(Thread.currentThread() == microBatchThread)
- uninterruptibleLock.synchronized {
- uninterruptible = true
- // Clear the interrupted status if it's set.
- if (Thread.interrupted()) {
- shouldInterruptThread = true
- }
- }
- try {
- f
- } finally {
- uninterruptibleLock.synchronized {
- uninterruptible = false
- if (shouldInterruptThread) {
- // Recover the interrupted status
- microBatchThread.interrupt()
- shouldInterruptThread = false
- }
- }
- }
- }
-
/** Whether the query is currently active or not */
override def isActive: Boolean = state == ACTIVE
@@ -294,7 +236,7 @@ class StreamExecution(
// method. See SPARK-14131.
//
// Check to see what new data is available.
- val newData = runUninterruptiblyInMicroBatchThread {
+ val newData = microBatchThread.runUninterruptibly {
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
}
availableOffsets ++= newData
@@ -305,7 +247,7 @@ class StreamExecution(
// As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set
// the file permission, we should not interrupt "microBatchThread" when running this method.
// See SPARK-14131.
- runUninterruptiblyInMicroBatchThread {
+ microBatchThread.runUninterruptibly {
assert(
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
@@ -395,7 +337,7 @@ class StreamExecution(
// intentionally
state = TERMINATED
if (microBatchThread.isAlive) {
- interruptMicroBatchThreadSafely()
+ microBatchThread.interrupt()
microBatchThread.join()
}
logInfo(s"Query $name was stopped")