diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 169 |
1 files changed, 65 insertions, 104 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 60e00d203c..87dd27a2b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger -import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -28,12 +27,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.ContinuousQueryListener import org.apache.spark.sql.util.ContinuousQueryListener._ +import org.apache.spark.util.UninterruptibleThread /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. @@ -42,30 +43,30 @@ import org.apache.spark.sql.util.ContinuousQueryListener._ * and the results are committed transactionally to the given [[Sink]]. */ class StreamExecution( - val sqlContext: SQLContext, + override val sqlContext: SQLContext, override val name: String, - val checkpointRoot: String, + checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, - val sink: Sink) extends ContinuousQuery with Logging { + val sink: Sink, + val trigger: Trigger) extends ContinuousQuery with Logging { /** An monitor used to wait/notify when batches complete. */ private val awaitBatchLock = new Object private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) - /** Minimum amount of time in between the start of each batch. */ - private val minBatchTime = 10 - /** * Tracks how much data we have processed and committed to the sink or state store from each * input source. */ + @volatile private[sql] var committedOffsets = new StreamProgress /** * Tracks the offsets that are available to be processed, but have not yet be committed to the * sink. */ + @volatile private var availableOffsets = new StreamProgress /** The current batchId or -1 if execution has not yet been initialized. */ @@ -73,11 +74,15 @@ class StreamExecution( /** All stream sources present the query plan. */ private val sources = - logicalPlan.collect { case s: StreamingRelation => s.source } + logicalPlan.collect { case s: StreamingExecutionRelation => s.source } /** A list of unique sources in the query plan. */ private val uniqueSources = sources.distinct + private val triggerExecutor = trigger match { + case t: ProcessingTime => ProcessingTimeExecutor(t) + } + /** Defines the internal state of execution */ @volatile private var state: State = INITIALIZED @@ -89,9 +94,10 @@ class StreamExecution( private[sql] var streamDeathCause: ContinuousQueryException = null /** The thread that runs the micro-batches of this stream. */ - private[sql] val microBatchThread = new Thread(s"stream execution thread for $name") { - override def run(): Unit = { runBatches() } - } + private[sql] val microBatchThread = + new UninterruptibleThread(s"stream execution thread for $name") { + override def run(): Unit = { runBatches() } + } /** * A write-ahead-log that records the offsets that are present in each batch. In order to ensure @@ -102,71 +108,13 @@ class StreamExecution( private val offsetLog = new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets")) - /** A monitor to protect "uninterruptible" and "interrupted" */ - private val uninterruptibleLock = new Object - - /** - * Indicates if "microBatchThread" are in the uninterruptible status. If so, interrupting - * "microBatchThread" will be deferred until "microBatchThread" enters into the interruptible - * status. - */ - @GuardedBy("uninterruptibleLock") - private var uninterruptible = false - - /** - * Indicates if we should interrupt "microBatchThread" when we are leaving the uninterruptible - * zone. - */ - @GuardedBy("uninterruptibleLock") - private var shouldInterruptThread = false - - /** - * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible - * status, "microBatchThread" won't be interrupted until it enters into the interruptible status. - */ - private def interruptMicroBatchThreadSafely(): Unit = { - uninterruptibleLock.synchronized { - if (uninterruptible) { - shouldInterruptThread = true - } else { - microBatchThread.interrupt() - } - } - } - - /** - * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" won't be interrupted before - * returning from `f`. - */ - private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = { - assert(Thread.currentThread() == microBatchThread) - uninterruptibleLock.synchronized { - uninterruptible = true - // Clear the interrupted status if it's set. - if (Thread.interrupted()) { - shouldInterruptThread = true - } - } - try { - f - } finally { - uninterruptibleLock.synchronized { - uninterruptible = false - if (shouldInterruptThread) { - // Recover the interrupted status - microBatchThread.interrupt() - shouldInterruptThread = false - } - } - } - } - /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE /** Returns current status of all the sources. */ override def sourceStatuses: Array[SourceStatus] = { - sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray + val localAvailableOffsets = availableOffsets + sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray } /** Returns current status of the sink. */ @@ -211,11 +159,15 @@ class StreamExecution( SQLContext.setActive(sqlContext) populateStartOffsets() logDebug(s"Stream running from $committedOffsets to $availableOffsets") - while (isActive) { - if (dataAvailable) runBatch() - commitAndConstructNextBatch() - Thread.sleep(minBatchTime) // TODO: Could be tighter - } + triggerExecutor.execute(() => { + if (isActive) { + if (dataAvailable) runBatch() + constructNextBatch() + true + } else { + false + } + }) } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() case NonFatal(e) => @@ -258,7 +210,7 @@ class StreamExecution( case None => // We are starting this stream for the first time. logInfo(s"Starting new continuous query.") currentBatchId = 0 - commitAndConstructNextBatch() + constructNextBatch() } } @@ -278,15 +230,8 @@ class StreamExecution( /** * Queries all of the sources to see if any new data is available. When there is new data the * batchId counter is incremented and a new log entry is written with the newest offsets. - * - * Note that committing the offsets for a new batch implicitly marks the previous batch as - * finished and thus this method should only be called when all currently available data - * has been written to the sink. */ - private def commitAndConstructNextBatch(): Boolean = { - // Update committed offsets. - committedOffsets ++= availableOffsets - + private def constructNextBatch(): Unit = { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" @@ -294,33 +239,37 @@ class StreamExecution( // method. See SPARK-14131. // // Check to see what new data is available. - val newData = runUninterruptiblyInMicroBatchThread { + val newData = microBatchThread.runUninterruptibly { uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) } availableOffsets ++= newData - if (dataAvailable) { + val hasNewData = awaitBatchLock.synchronized { + if (dataAvailable) { + true + } else { + noNewData = true + false + } + } + if (hasNewData) { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set // the file permission, we should not interrupt "microBatchThread" when running this method. // See SPARK-14131. - runUninterruptiblyInMicroBatchThread { + microBatchThread.runUninterruptibly { assert( offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") } currentBatchId += 1 logInfo(s"Committed offsets for batch $currentBatchId.") - true } else { - noNewData = true awaitBatchLock.synchronized { // Wake up any threads that are waiting for the stream to progress. awaitBatchLock.notifyAll() } - - false } } @@ -330,6 +279,8 @@ class StreamExecution( private def runBatch(): Unit = { val startTime = System.nanoTime() + // TODO: Move this to IncrementalExecution. + // Request unprocessed data from all sources. val newData = availableOffsets.flatMap { case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) => @@ -344,7 +295,7 @@ class StreamExecution( var replacements = new ArrayBuffer[(Attribute, Attribute)] // Replace sources in the logical plan with data that has arrived since the last batch. val withNewSources = logicalPlan transform { - case StreamingRelation(source, output) => + case StreamingExecutionRelation(source, output) => newData.get(source).map { data => val newPlan = data.logicalPlan assert(output.size == newPlan.output.size, @@ -363,13 +314,14 @@ class StreamExecution( } val optimizerStart = System.nanoTime() - - lastExecution = new QueryExecution(sqlContext, newPlan) - val executedPlan = lastExecution.executedPlan + lastExecution = + new IncrementalExecution(sqlContext, newPlan, checkpointFile("state"), currentBatchId) + lastExecution.executedPlan val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000 logDebug(s"Optimized batch in ${optimizerTime}ms") - val nextBatch = Dataset.ofRows(sqlContext, newPlan) + val nextBatch = + new Dataset(sqlContext, lastExecution, RowEncoder(lastExecution.analyzed.schema)) sink.addBatch(currentBatchId - 1, nextBatch) awaitBatchLock.synchronized { @@ -379,6 +331,8 @@ class StreamExecution( val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") + // Update committed offsets. + committedOffsets ++= availableOffsets postEvent(new QueryProgress(this)) } @@ -395,7 +349,7 @@ class StreamExecution( // intentionally state = TERMINATED if (microBatchThread.isAlive) { - interruptMicroBatchThreadSafely() + microBatchThread.interrupt() microBatchThread.join() } logInfo(s"Query $name was stopped") @@ -406,7 +360,10 @@ class StreamExecution( * least the given `Offset`. This method is indented for use primarily when writing tests. */ def awaitOffset(source: Source, newOffset: Offset): Unit = { - def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset + def notDone = { + val localCommittedOffsets = committedOffsets + !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset + } while (notDone) { logInfo(s"Waiting until $newOffset at $source") @@ -418,13 +375,17 @@ class StreamExecution( /** A flag to indicate that a batch has completed with no new data available. */ @volatile private var noNewData = false - override def processAllAvailable(): Unit = { + override def processAllAvailable(): Unit = awaitBatchLock.synchronized { noNewData = false - while (!noNewData) { - awaitBatchLock.synchronized { awaitBatchLock.wait(10000) } - if (streamDeathCause != null) { throw streamDeathCause } + while (true) { + awaitBatchLock.wait(10000) + if (streamDeathCause != null) { + throw streamDeathCause + } + if (noNewData) { + return + } } - if (streamDeathCause != null) { throw streamDeathCause } } override def awaitTermination(): Unit = { |