diff options
4 files changed, 99 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index e9052a3095..8b96f65bc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -27,12 +27,12 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]] * plan incrementally. Possibly preserving state in between each execution. */ -class IncrementalExecution( +class IncrementalExecution private[sql]( sparkSession: SparkSession, logicalPlan: LogicalPlan, outputMode: OutputMode, checkpointLocation: String, - currentBatchId: Long) + val currentBatchId: Long) extends QueryExecution(sparkSession, logicalPlan) { // TODO: make this always part of planning. @@ -57,7 +57,7 @@ class IncrementalExecution( case StateStoreSaveExec(keys, None, UnaryExecNode(agg, StateStoreRestoreExec(keys2, None, child))) => - val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId - 1) + val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId) operatorId += 1 StateStoreSaveExec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ea367b699f..df6304d85f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -122,7 +122,7 @@ class StreamExecution( * processing is done. Thus, the Nth record in this log indicated data that is currently being * processed and the N-1th entry indicates which offsets have been durably committed to the sink. */ - private val offsetLog = + private[sql] val offsetLog = new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets")) /** Whether the query is currently active or not */ @@ -174,12 +174,21 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SQLContext.setActive(sparkSession.wrapped) - populateStartOffsets() - logDebug(s"Stream running from $committedOffsets to $availableOffsets") + triggerExecutor.execute(() => { if (isActive) { - if (dataAvailable) runBatch() - constructNextBatch() + if (currentBatchId < 0) { + // We'll do this initialization only once + populateStartOffsets() + logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { + constructNextBatch() + } + if (dataAvailable) { + runBatch() + // We'll increase currentBatchId after we complete processing current batch's data + currentBatchId += 1 + } true } else { false @@ -214,7 +223,7 @@ class StreamExecution( offsetLog.getLatest() match { case Some((batchId, nextOffsets)) => logInfo(s"Resuming continuous query, starting with batch $batchId") - currentBatchId = batchId + 1 + currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) logDebug(s"Found possibly uncommitted offsets $availableOffsets") @@ -285,7 +294,6 @@ class StreamExecution( offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") } - currentBatchId += 1 logInfo(s"Committed offsets for batch $currentBatchId.") } else { awaitBatchLock.lock() @@ -352,7 +360,7 @@ class StreamExecution( val nextBatch = new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema)) - sink.addBatch(currentBatchId - 1, nextBatch) + sink.addBatch(currentBatchId, nextBatch) awaitBatchLock.lock() try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index a34927ff99..bcc33ae8c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -124,6 +124,10 @@ class MemorySink(val schema: StructType) extends Sink with Logging { batches.flatten } + def latestBatchId: Option[Int] = synchronized { + if (batches.size == 0) None else Some(batches.size - 1) + } + def lastBatch: Seq[Row] = synchronized { batches.last } def toDebugString: String = synchronized { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6a8b280174..013b731693 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with SharedSQLContext { } } - // This would fail for now -- error is "Timed out waiting for stream" - // Root cause is that data generated in batch 0 may not get processed in batch 1 - // Let's enable this after SPARK-14942: Reduce delay between batch construction and execution - ignore("minimize delay between batch construction and execution") { + test("minimize delay between batch construction and execution") { + + // For each batch, we would retrieve new data's offsets and log them before we run the execution + // This checks whether the key of the offset log is the expected batch id + def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery = + AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId, + s"offsetLog's latest should be $expectedId") + + // For each batch, we would log the state change during the execution + // This checks whether the key of the state change log is the expected batch id + def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery = + AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId == expectedId, + s"lastExecution's currentBatchId should be $expectedId") + + // For each batch, we would log the sink change after the execution + // This checks whether the key of the sink change log is the expected batch id + def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery = + AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == expectedId, + s"sink's lastBatchId should be $expectedId") + val inputData = MemoryStream[Int] testStream(inputData.toDS())( StartStream(ProcessingTime("10 seconds"), new ManualClock), + /* -- batch 0 ----------------------- */ - AddData(inputData, 1), - AddData(inputData, 2), - AddData(inputData, 3), + // Add some data in batch 0 + AddData(inputData, 1, 2, 3), AdvanceManualClock(10 * 1000), // 10 seconds + /* -- batch 1 ----------------------- */ - CheckAnswer(1, 2, 3)) + // Check the results of batch 0 + CheckAnswer(1, 2, 3), + CheckIncrementalExecutionCurrentBatchId(0), + CheckOffsetLogLatestBatchId(0), + CheckSinkLatestBatchId(0), + // Add some data in batch 1 + AddData(inputData, 4, 5, 6), + AdvanceManualClock(10 * 1000), + + /* -- batch _ ----------------------- */ + // Check the results of batch 1 + CheckAnswer(1, 2, 3, 4, 5, 6), + CheckIncrementalExecutionCurrentBatchId(1), + CheckOffsetLogLatestBatchId(1), + CheckSinkLatestBatchId(1), + + AdvanceManualClock(10 * 1000), + AdvanceManualClock(10 * 1000), + AdvanceManualClock(10 * 1000), + + /* -- batch __ ---------------------- */ + // Check the results of batch 1 again; this is to make sure that, when there's no new data, + // the currentId does not get logged (e.g. as 2) even if the clock has advanced many times + CheckAnswer(1, 2, 3, 4, 5, 6), + CheckIncrementalExecutionCurrentBatchId(1), + CheckOffsetLogLatestBatchId(1), + CheckSinkLatestBatchId(1), + + /* Stop then restart the Stream */ + StopStream, + StartStream(ProcessingTime("10 seconds"), new ManualClock), + + /* -- batch 1 rerun ----------------- */ + // this batch 1 would re-run because the latest batch id logged in offset log is 1 + AdvanceManualClock(10 * 1000), + + /* -- batch 2 ----------------------- */ + // Check the results of batch 1 + CheckAnswer(1, 2, 3, 4, 5, 6), + CheckIncrementalExecutionCurrentBatchId(1), + CheckOffsetLogLatestBatchId(1), + CheckSinkLatestBatchId(1), + // Add some data in batch 2 + AddData(inputData, 7, 8, 9), + AdvanceManualClock(10 * 1000), + + /* -- batch 3 ----------------------- */ + // Check the results of batch 2 + CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9), + CheckIncrementalExecutionCurrentBatchId(2), + CheckOffsetLogLatestBatchId(2), + CheckSinkLatestBatchId(2)) } } |