aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala169
1 files changed, 65 insertions, 104 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 60e00d203c..87dd27a2b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
-import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
@@ -28,12 +27,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.ContinuousQueryListener._
+import org.apache.spark.util.UninterruptibleThread
/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
@@ -42,30 +43,30 @@ import org.apache.spark.sql.util.ContinuousQueryListener._
* and the results are committed transactionally to the given [[Sink]].
*/
class StreamExecution(
- val sqlContext: SQLContext,
+ override val sqlContext: SQLContext,
override val name: String,
- val checkpointRoot: String,
+ checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
- val sink: Sink) extends ContinuousQuery with Logging {
+ val sink: Sink,
+ val trigger: Trigger) extends ContinuousQuery with Logging {
/** An monitor used to wait/notify when batches complete. */
private val awaitBatchLock = new Object
private val startLatch = new CountDownLatch(1)
private val terminationLatch = new CountDownLatch(1)
- /** Minimum amount of time in between the start of each batch. */
- private val minBatchTime = 10
-
/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
*/
+ @volatile
private[sql] var committedOffsets = new StreamProgress
/**
* Tracks the offsets that are available to be processed, but have not yet be committed to the
* sink.
*/
+ @volatile
private var availableOffsets = new StreamProgress
/** The current batchId or -1 if execution has not yet been initialized. */
@@ -73,11 +74,15 @@ class StreamExecution(
/** All stream sources present the query plan. */
private val sources =
- logicalPlan.collect { case s: StreamingRelation => s.source }
+ logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
/** A list of unique sources in the query plan. */
private val uniqueSources = sources.distinct
+ private val triggerExecutor = trigger match {
+ case t: ProcessingTime => ProcessingTimeExecutor(t)
+ }
+
/** Defines the internal state of execution */
@volatile
private var state: State = INITIALIZED
@@ -89,9 +94,10 @@ class StreamExecution(
private[sql] var streamDeathCause: ContinuousQueryException = null
/** The thread that runs the micro-batches of this stream. */
- private[sql] val microBatchThread = new Thread(s"stream execution thread for $name") {
- override def run(): Unit = { runBatches() }
- }
+ private[sql] val microBatchThread =
+ new UninterruptibleThread(s"stream execution thread for $name") {
+ override def run(): Unit = { runBatches() }
+ }
/**
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
@@ -102,71 +108,13 @@ class StreamExecution(
private val offsetLog =
new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets"))
- /** A monitor to protect "uninterruptible" and "interrupted" */
- private val uninterruptibleLock = new Object
-
- /**
- * Indicates if "microBatchThread" are in the uninterruptible status. If so, interrupting
- * "microBatchThread" will be deferred until "microBatchThread" enters into the interruptible
- * status.
- */
- @GuardedBy("uninterruptibleLock")
- private var uninterruptible = false
-
- /**
- * Indicates if we should interrupt "microBatchThread" when we are leaving the uninterruptible
- * zone.
- */
- @GuardedBy("uninterruptibleLock")
- private var shouldInterruptThread = false
-
- /**
- * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible
- * status, "microBatchThread" won't be interrupted until it enters into the interruptible status.
- */
- private def interruptMicroBatchThreadSafely(): Unit = {
- uninterruptibleLock.synchronized {
- if (uninterruptible) {
- shouldInterruptThread = true
- } else {
- microBatchThread.interrupt()
- }
- }
- }
-
- /**
- * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" won't be interrupted before
- * returning from `f`.
- */
- private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = {
- assert(Thread.currentThread() == microBatchThread)
- uninterruptibleLock.synchronized {
- uninterruptible = true
- // Clear the interrupted status if it's set.
- if (Thread.interrupted()) {
- shouldInterruptThread = true
- }
- }
- try {
- f
- } finally {
- uninterruptibleLock.synchronized {
- uninterruptible = false
- if (shouldInterruptThread) {
- // Recover the interrupted status
- microBatchThread.interrupt()
- shouldInterruptThread = false
- }
- }
- }
- }
-
/** Whether the query is currently active or not */
override def isActive: Boolean = state == ACTIVE
/** Returns current status of all the sources. */
override def sourceStatuses: Array[SourceStatus] = {
- sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray
+ val localAvailableOffsets = availableOffsets
+ sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray
}
/** Returns current status of the sink. */
@@ -211,11 +159,15 @@ class StreamExecution(
SQLContext.setActive(sqlContext)
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
- while (isActive) {
- if (dataAvailable) runBatch()
- commitAndConstructNextBatch()
- Thread.sleep(minBatchTime) // TODO: Could be tighter
- }
+ triggerExecutor.execute(() => {
+ if (isActive) {
+ if (dataAvailable) runBatch()
+ constructNextBatch()
+ true
+ } else {
+ false
+ }
+ })
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
case NonFatal(e) =>
@@ -258,7 +210,7 @@ class StreamExecution(
case None => // We are starting this stream for the first time.
logInfo(s"Starting new continuous query.")
currentBatchId = 0
- commitAndConstructNextBatch()
+ constructNextBatch()
}
}
@@ -278,15 +230,8 @@ class StreamExecution(
/**
* Queries all of the sources to see if any new data is available. When there is new data the
* batchId counter is incremented and a new log entry is written with the newest offsets.
- *
- * Note that committing the offsets for a new batch implicitly marks the previous batch as
- * finished and thus this method should only be called when all currently available data
- * has been written to the sink.
*/
- private def commitAndConstructNextBatch(): Boolean = {
- // Update committed offsets.
- committedOffsets ++= availableOffsets
-
+ private def constructNextBatch(): Unit = {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand"
@@ -294,33 +239,37 @@ class StreamExecution(
// method. See SPARK-14131.
//
// Check to see what new data is available.
- val newData = runUninterruptiblyInMicroBatchThread {
+ val newData = microBatchThread.runUninterruptibly {
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
}
availableOffsets ++= newData
- if (dataAvailable) {
+ val hasNewData = awaitBatchLock.synchronized {
+ if (dataAvailable) {
+ true
+ } else {
+ noNewData = true
+ false
+ }
+ }
+ if (hasNewData) {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set
// the file permission, we should not interrupt "microBatchThread" when running this method.
// See SPARK-14131.
- runUninterruptiblyInMicroBatchThread {
+ microBatchThread.runUninterruptibly {
assert(
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
}
currentBatchId += 1
logInfo(s"Committed offsets for batch $currentBatchId.")
- true
} else {
- noNewData = true
awaitBatchLock.synchronized {
// Wake up any threads that are waiting for the stream to progress.
awaitBatchLock.notifyAll()
}
-
- false
}
}
@@ -330,6 +279,8 @@ class StreamExecution(
private def runBatch(): Unit = {
val startTime = System.nanoTime()
+ // TODO: Move this to IncrementalExecution.
+
// Request unprocessed data from all sources.
val newData = availableOffsets.flatMap {
case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
@@ -344,7 +295,7 @@ class StreamExecution(
var replacements = new ArrayBuffer[(Attribute, Attribute)]
// Replace sources in the logical plan with data that has arrived since the last batch.
val withNewSources = logicalPlan transform {
- case StreamingRelation(source, output) =>
+ case StreamingExecutionRelation(source, output) =>
newData.get(source).map { data =>
val newPlan = data.logicalPlan
assert(output.size == newPlan.output.size,
@@ -363,13 +314,14 @@ class StreamExecution(
}
val optimizerStart = System.nanoTime()
-
- lastExecution = new QueryExecution(sqlContext, newPlan)
- val executedPlan = lastExecution.executedPlan
+ lastExecution =
+ new IncrementalExecution(sqlContext, newPlan, checkpointFile("state"), currentBatchId)
+ lastExecution.executedPlan
val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
logDebug(s"Optimized batch in ${optimizerTime}ms")
- val nextBatch = Dataset.ofRows(sqlContext, newPlan)
+ val nextBatch =
+ new Dataset(sqlContext, lastExecution, RowEncoder(lastExecution.analyzed.schema))
sink.addBatch(currentBatchId - 1, nextBatch)
awaitBatchLock.synchronized {
@@ -379,6 +331,8 @@ class StreamExecution(
val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
+ // Update committed offsets.
+ committedOffsets ++= availableOffsets
postEvent(new QueryProgress(this))
}
@@ -395,7 +349,7 @@ class StreamExecution(
// intentionally
state = TERMINATED
if (microBatchThread.isAlive) {
- interruptMicroBatchThreadSafely()
+ microBatchThread.interrupt()
microBatchThread.join()
}
logInfo(s"Query $name was stopped")
@@ -406,7 +360,10 @@ class StreamExecution(
* least the given `Offset`. This method is indented for use primarily when writing tests.
*/
def awaitOffset(source: Source, newOffset: Offset): Unit = {
- def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset
+ def notDone = {
+ val localCommittedOffsets = committedOffsets
+ !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset
+ }
while (notDone) {
logInfo(s"Waiting until $newOffset at $source")
@@ -418,13 +375,17 @@ class StreamExecution(
/** A flag to indicate that a batch has completed with no new data available. */
@volatile private var noNewData = false
- override def processAllAvailable(): Unit = {
+ override def processAllAvailable(): Unit = awaitBatchLock.synchronized {
noNewData = false
- while (!noNewData) {
- awaitBatchLock.synchronized { awaitBatchLock.wait(10000) }
- if (streamDeathCause != null) { throw streamDeathCause }
+ while (true) {
+ awaitBatchLock.wait(10000)
+ if (streamDeathCause != null) {
+ throw streamDeathCause
+ }
+ if (noNewData) {
+ return
+ }
}
- if (streamDeathCause != null) { throw streamDeathCause }
}
override def awaitTermination(): Unit = {