diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-02-17 19:04:45 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-02-17 19:04:45 -0800 |
commit | 15b144d2bf4555981a51276277c08a9c11a402f6 (patch) | |
tree | bf4ea06ed8854d5b9c871ee1411670bd0c6b281d /sql/core/src/main/scala/org/apache | |
parent | 988f6d7ee8017756645f2af9993ee020332442cb (diff) | |
download | spark-15b144d2bf4555981a51276277c08a9c11a402f6.tar.gz spark-15b144d2bf4555981a51276277c08a9c11a402f6.tar.bz2 spark-15b144d2bf4555981a51276277c08a9c11a402f6.zip |
[SPARK-19617][SS] Fix the race condition when starting and stopping a query quickly
## What changes were proposed in this pull request?
The streaming thread in StreamExecution uses the following ways to check if it should exit:
- Catch an InterruptException.
- `StreamExecution.state` is TERMINATED.
When starting and stopping a query quickly, the above two checks may both fail:
- Hit [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) and swallow InterruptException
- StreamExecution.stop is called before `state` becomes `ACTIVE`. Then [runBatches](https://github.com/apache/spark/blob/dcc2d540a53f0bd04baead43fdee1c170ef2b9f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L252) changes the state from `TERMINATED` to `ACTIVE`.
If the above cases both happen, the query will hang forever.
This PR changes `state` to `AtomicReference` and uses`compareAndSet` to make sure we only change the state from `INITIALIZING` to `ACTIVE`. It also removes the `runUninterruptibly` hack from ``HDFSMetadata`, because HADOOP-14084 won't cause any problem after we fix the race condition.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16947 from zsxwing/SPARK-19617.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 49 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 99 |
2 files changed, 53 insertions, 95 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 3155ce04a1..f9e1f7de9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -32,7 +32,6 @@ import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.util.UninterruptibleThread /** @@ -109,39 +108,12 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: override def add(batchId: Long, metadata: T): Boolean = { get(batchId).map(_ => false).getOrElse { // Only write metadata when the batch has not yet been written - if (fileManager.isLocalFileSystem) { - Thread.currentThread match { - case ut: UninterruptibleThread => - // When using a local file system, "writeBatch" must be called on a - // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled - // while writing the batch file. - // - // This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084). - // If the user tries to stop a query, and the thread running "Shell.runCommand" is - // interrupted, then InterruptException will be dropped and the query will be still - // running. (Note: `writeBatch` creates a file using HDFS APIs and will call - // "Shell.runCommand" to set the file permission if using the local file system) - // - // Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which - // allows us to disable interrupts here, in order to propagate the interrupt state - // correctly. Also see SPARK-19599. - ut.runUninterruptibly { writeBatch(batchId, metadata) } - case _ => - throw new IllegalStateException( - "HDFSMetadataLog.add() on a local file system must be executed on " + - "a o.a.spark.util.UninterruptibleThread") - } - } else { - // For a distributed file system, such as HDFS or S3, if the network is broken, write - // operations may just hang until timeout. We should enable interrupts to allow stopping - // the query fast. - writeBatch(batchId, metadata) - } + writeBatch(batchId, metadata) true } } - def writeTempBatch(metadata: T): Option[Path] = { + private def writeTempBatch(metadata: T): Option[Path] = { while (true) { val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp") try { @@ -327,9 +299,6 @@ object HDFSMetadataLog { /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ def delete(path: Path): Unit - - /** Whether the file systme is a local FS. */ - def isLocalFileSystem: Boolean } /** @@ -374,13 +343,6 @@ object HDFSMetadataLog { // ignore if file has already been deleted } } - - override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match { - case _: local.LocalFs | _: local.RawLocalFs => - // LocalFs = RawLocalFs + ChecksumFs - true - case _ => false - } } /** @@ -437,12 +399,5 @@ object HDFSMetadataLog { // ignore if file has already been deleted } } - - override def isLocalFileSystem: Boolean = fs match { - case _: LocalFileSystem | _: RawLocalFileSystem => - // LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem - true - case _ => false - } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index e1af420a69..4bd6431cbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable.ArrayBuffer @@ -157,8 +158,7 @@ class StreamExecution( } /** Defines the internal state of execution */ - @volatile - private var state: State = INITIALIZING + private val state = new AtomicReference[State](INITIALIZING) @volatile var lastExecution: IncrementalExecution = _ @@ -178,8 +178,8 @@ class StreamExecution( /** * The thread that runs the micro-batches of this stream. Note that this thread must be - * [[org.apache.spark.util.UninterruptibleThread]] to avoid swallowing `InterruptException` when - * using [[HDFSMetadataLog]]. See SPARK-19599 for more details. + * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a + * running `KafkaConsumer` may cause endless loop. */ val microBatchThread = new StreamExecutionThread(s"stream execution thread for $prettyIdString") { @@ -200,10 +200,10 @@ class StreamExecution( val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets")) /** Whether all fields of the query have been initialized */ - private def isInitialized: Boolean = state != INITIALIZING + private def isInitialized: Boolean = state.get != INITIALIZING /** Whether the query is currently active or not */ - override def isActive: Boolean = state != TERMINATED + override def isActive: Boolean = state.get != TERMINATED /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */ override def exception: Option[StreamingQueryException] = Option(streamDeathCause) @@ -249,53 +249,56 @@ class StreamExecution( updateStatusMessage("Initializing sources") // force initialization of the logical plan so that the sources can be created logicalPlan - state = ACTIVE - // Unblock `awaitInitialization` - initializationLatch.countDown() - - triggerExecutor.execute(() => { - startTrigger() - - val isTerminated = - if (isActive) { - reportTimeTaken("triggerExecution") { - if (currentBatchId < 0) { - // We'll do this initialization only once - populateStartOffsets() - logDebug(s"Stream running from $committedOffsets to $availableOffsets") - } else { - constructNextBatch() + if (state.compareAndSet(INITIALIZING, ACTIVE)) { + // Unblock `awaitInitialization` + initializationLatch.countDown() + + triggerExecutor.execute(() => { + startTrigger() + + val continueToRun = + if (isActive) { + reportTimeTaken("triggerExecution") { + if (currentBatchId < 0) { + // We'll do this initialization only once + populateStartOffsets() + logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { + constructNextBatch() + } + if (dataAvailable) { + currentStatus = currentStatus.copy(isDataAvailable = true) + updateStatusMessage("Processing new data") + runBatch() + } } + + // Report trigger as finished and construct progress object. + finishTrigger(dataAvailable) if (dataAvailable) { - currentStatus = currentStatus.copy(isDataAvailable = true) - updateStatusMessage("Processing new data") - runBatch() + // We'll increase currentBatchId after we complete processing current batch's data + currentBatchId += 1 + } else { + currentStatus = currentStatus.copy(isDataAvailable = false) + updateStatusMessage("Waiting for data to arrive") + Thread.sleep(pollingDelayMs) } - } - - // Report trigger as finished and construct progress object. - finishTrigger(dataAvailable) - if (dataAvailable) { - // We'll increase currentBatchId after we complete processing current batch's data - currentBatchId += 1 + true } else { - currentStatus = currentStatus.copy(isDataAvailable = false) - updateStatusMessage("Waiting for data to arrive") - Thread.sleep(pollingDelayMs) + false } - true - } else { - false - } - // Update committed offsets. - committedOffsets ++= availableOffsets - updateStatusMessage("Waiting for next trigger") - isTerminated - }) - updateStatusMessage("Stopped") + // Update committed offsets. + committedOffsets ++= availableOffsets + updateStatusMessage("Waiting for next trigger") + continueToRun + }) + updateStatusMessage("Stopped") + } else { + // `stop()` is already called. Let `finally` finish the cleanup. + } } catch { - case _: InterruptedException if state == TERMINATED => // interrupted by stop() + case _: InterruptedException if state.get == TERMINATED => // interrupted by stop() updateStatusMessage("Stopped") case e: Throwable => streamDeathCause = new StreamingQueryException( @@ -318,7 +321,7 @@ class StreamExecution( initializationLatch.countDown() try { - state = TERMINATED + state.set(TERMINATED) currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false) // Update metrics and status @@ -562,7 +565,7 @@ class StreamExecution( override def stop(): Unit = { // Set the state to TERMINATED so that the batching thread knows that it was interrupted // intentionally - state = TERMINATED + state.set(TERMINATED) if (microBatchThread.isAlive) { microBatchThread.interrupt() microBatchThread.join() |