aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala84
4 files changed, 99 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index e9052a3095..8b96f65bc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -27,12 +27,12 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner,
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
* plan incrementally. Possibly preserving state in between each execution.
*/
-class IncrementalExecution(
+class IncrementalExecution private[sql](
sparkSession: SparkSession,
logicalPlan: LogicalPlan,
outputMode: OutputMode,
checkpointLocation: String,
- currentBatchId: Long)
+ val currentBatchId: Long)
extends QueryExecution(sparkSession, logicalPlan) {
// TODO: make this always part of planning.
@@ -57,7 +57,7 @@ class IncrementalExecution(
case StateStoreSaveExec(keys, None,
UnaryExecNode(agg,
StateStoreRestoreExec(keys2, None, child))) =>
- val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId - 1)
+ val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId)
operatorId += 1
StateStoreSaveExec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ea367b699f..df6304d85f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -122,7 +122,7 @@ class StreamExecution(
* processing is done. Thus, the Nth record in this log indicated data that is currently being
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
- private val offsetLog =
+ private[sql] val offsetLog =
new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets"))
/** Whether the query is currently active or not */
@@ -174,12 +174,21 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SQLContext.setActive(sparkSession.wrapped)
- populateStartOffsets()
- logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+
triggerExecutor.execute(() => {
if (isActive) {
- if (dataAvailable) runBatch()
- constructNextBatch()
+ if (currentBatchId < 0) {
+ // We'll do this initialization only once
+ populateStartOffsets()
+ logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+ } else {
+ constructNextBatch()
+ }
+ if (dataAvailable) {
+ runBatch()
+ // We'll increase currentBatchId after we complete processing current batch's data
+ currentBatchId += 1
+ }
true
} else {
false
@@ -214,7 +223,7 @@ class StreamExecution(
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
logInfo(s"Resuming continuous query, starting with batch $batchId")
- currentBatchId = batchId + 1
+ currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
logDebug(s"Found possibly uncommitted offsets $availableOffsets")
@@ -285,7 +294,6 @@ class StreamExecution(
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
}
- currentBatchId += 1
logInfo(s"Committed offsets for batch $currentBatchId.")
} else {
awaitBatchLock.lock()
@@ -352,7 +360,7 @@ class StreamExecution(
val nextBatch =
new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
- sink.addBatch(currentBatchId - 1, nextBatch)
+ sink.addBatch(currentBatchId, nextBatch)
awaitBatchLock.lock()
try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index a34927ff99..bcc33ae8c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -124,6 +124,10 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
batches.flatten
}
+ def latestBatchId: Option[Int] = synchronized {
+ if (batches.size == 0) None else Some(batches.size - 1)
+ }
+
def lastBatch: Seq[Row] = synchronized { batches.last }
def toDebugString: String = synchronized {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6a8b280174..013b731693 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with SharedSQLContext {
}
}
- // This would fail for now -- error is "Timed out waiting for stream"
- // Root cause is that data generated in batch 0 may not get processed in batch 1
- // Let's enable this after SPARK-14942: Reduce delay between batch construction and execution
- ignore("minimize delay between batch construction and execution") {
+ test("minimize delay between batch construction and execution") {
+
+ // For each batch, we would retrieve new data's offsets and log them before we run the execution
+ // This checks whether the key of the offset log is the expected batch id
+ def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery =
+ AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
+ s"offsetLog's latest should be $expectedId")
+
+ // For each batch, we would log the state change during the execution
+ // This checks whether the key of the state change log is the expected batch id
+ def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
+ AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId == expectedId,
+ s"lastExecution's currentBatchId should be $expectedId")
+
+ // For each batch, we would log the sink change after the execution
+ // This checks whether the key of the sink change log is the expected batch id
+ def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery =
+ AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == expectedId,
+ s"sink's lastBatchId should be $expectedId")
+
val inputData = MemoryStream[Int]
testStream(inputData.toDS())(
StartStream(ProcessingTime("10 seconds"), new ManualClock),
+
/* -- batch 0 ----------------------- */
- AddData(inputData, 1),
- AddData(inputData, 2),
- AddData(inputData, 3),
+ // Add some data in batch 0
+ AddData(inputData, 1, 2, 3),
AdvanceManualClock(10 * 1000), // 10 seconds
+
/* -- batch 1 ----------------------- */
- CheckAnswer(1, 2, 3))
+ // Check the results of batch 0
+ CheckAnswer(1, 2, 3),
+ CheckIncrementalExecutionCurrentBatchId(0),
+ CheckOffsetLogLatestBatchId(0),
+ CheckSinkLatestBatchId(0),
+ // Add some data in batch 1
+ AddData(inputData, 4, 5, 6),
+ AdvanceManualClock(10 * 1000),
+
+ /* -- batch _ ----------------------- */
+ // Check the results of batch 1
+ CheckAnswer(1, 2, 3, 4, 5, 6),
+ CheckIncrementalExecutionCurrentBatchId(1),
+ CheckOffsetLogLatestBatchId(1),
+ CheckSinkLatestBatchId(1),
+
+ AdvanceManualClock(10 * 1000),
+ AdvanceManualClock(10 * 1000),
+ AdvanceManualClock(10 * 1000),
+
+ /* -- batch __ ---------------------- */
+ // Check the results of batch 1 again; this is to make sure that, when there's no new data,
+ // the currentId does not get logged (e.g. as 2) even if the clock has advanced many times
+ CheckAnswer(1, 2, 3, 4, 5, 6),
+ CheckIncrementalExecutionCurrentBatchId(1),
+ CheckOffsetLogLatestBatchId(1),
+ CheckSinkLatestBatchId(1),
+
+ /* Stop then restart the Stream */
+ StopStream,
+ StartStream(ProcessingTime("10 seconds"), new ManualClock),
+
+ /* -- batch 1 rerun ----------------- */
+ // this batch 1 would re-run because the latest batch id logged in offset log is 1
+ AdvanceManualClock(10 * 1000),
+
+ /* -- batch 2 ----------------------- */
+ // Check the results of batch 1
+ CheckAnswer(1, 2, 3, 4, 5, 6),
+ CheckIncrementalExecutionCurrentBatchId(1),
+ CheckOffsetLogLatestBatchId(1),
+ CheckSinkLatestBatchId(1),
+ // Add some data in batch 2
+ AddData(inputData, 7, 8, 9),
+ AdvanceManualClock(10 * 1000),
+
+ /* -- batch 3 ----------------------- */
+ // Check the results of batch 2
+ CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
+ CheckIncrementalExecutionCurrentBatchId(2),
+ CheckOffsetLogLatestBatchId(2),
+ CheckSinkLatestBatchId(2))
}
}