From 95f4fbae52d26ede94c3ba8248394749f3d95dcc Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Mon, 16 May 2016 12:59:55 -0700 Subject: [SPARK-14942][SQL][STREAMING] Reduce delay between batch construction and execution ## Problem Currently in `StreamExecution`, [we first run the batch, then construct the next](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L165): ```scala if (dataAvailable) runBatch() constructNextBatch() ``` This is good when we run batches ASAP, where data would get processed in the **very next batch**: ![1](https://cloud.githubusercontent.com/assets/15843379/14779964/2786e698-0b0d-11e6-9d2c-bb41513488b2.png) However, when we run batches at trigger like `ProcessTime("1 minute")`, data - such as _y_ below - may not get processed in the very next batch i.e. _batch 1_, but in _batch 2_: ![2](https://cloud.githubusercontent.com/assets/15843379/14779818/6f3bb064-0b0c-11e6-9f16-c1ce4897186b.png) ## What changes were proposed in this pull request? This patch reverses the order of `constructNextBatch()` and `runBatch()`. After this patch, data would get processed in the **very next batch**, i.e. _batch 1_: ![3](https://cloud.githubusercontent.com/assets/15843379/14779816/6f36ee62-0b0c-11e6-9e53-bc8397fade18.png) In addition, this patch alters when we do `currentBatchId += 1`: let's do that when the processing of the current batch's data is completed, so we won't bother passing `currentBatchId + 1` or `currentBatchId - 1` to states or sinks. ## How was this patch tested? New added test case. Also this should be covered by existing test suits, e.g. stress tests and others. Author: Liwei Lin Closes #12725 from lw-lin/construct-before-run-3. --- .../execution/streaming/IncrementalExecution.scala | 6 +- .../sql/execution/streaming/StreamExecution.scala | 24 ++++--- .../spark/sql/execution/streaming/memory.scala | 4 ++ .../apache/spark/sql/streaming/StreamSuite.scala | 84 +++++++++++++++++++--- 4 files changed, 99 insertions(+), 19 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index e9052a3095..8b96f65bc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -27,12 +27,12 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]] * plan incrementally. Possibly preserving state in between each execution. */ -class IncrementalExecution( +class IncrementalExecution private[sql]( sparkSession: SparkSession, logicalPlan: LogicalPlan, outputMode: OutputMode, checkpointLocation: String, - currentBatchId: Long) + val currentBatchId: Long) extends QueryExecution(sparkSession, logicalPlan) { // TODO: make this always part of planning. @@ -57,7 +57,7 @@ class IncrementalExecution( case StateStoreSaveExec(keys, None, UnaryExecNode(agg, StateStoreRestoreExec(keys2, None, child))) => - val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId - 1) + val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId) operatorId += 1 StateStoreSaveExec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ea367b699f..df6304d85f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -122,7 +122,7 @@ class StreamExecution( * processing is done. Thus, the Nth record in this log indicated data that is currently being * processed and the N-1th entry indicates which offsets have been durably committed to the sink. */ - private val offsetLog = + private[sql] val offsetLog = new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets")) /** Whether the query is currently active or not */ @@ -174,12 +174,21 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SQLContext.setActive(sparkSession.wrapped) - populateStartOffsets() - logDebug(s"Stream running from $committedOffsets to $availableOffsets") + triggerExecutor.execute(() => { if (isActive) { - if (dataAvailable) runBatch() - constructNextBatch() + if (currentBatchId < 0) { + // We'll do this initialization only once + populateStartOffsets() + logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { + constructNextBatch() + } + if (dataAvailable) { + runBatch() + // We'll increase currentBatchId after we complete processing current batch's data + currentBatchId += 1 + } true } else { false @@ -214,7 +223,7 @@ class StreamExecution( offsetLog.getLatest() match { case Some((batchId, nextOffsets)) => logInfo(s"Resuming continuous query, starting with batch $batchId") - currentBatchId = batchId + 1 + currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) logDebug(s"Found possibly uncommitted offsets $availableOffsets") @@ -285,7 +294,6 @@ class StreamExecution( offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") } - currentBatchId += 1 logInfo(s"Committed offsets for batch $currentBatchId.") } else { awaitBatchLock.lock() @@ -352,7 +360,7 @@ class StreamExecution( val nextBatch = new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema)) - sink.addBatch(currentBatchId - 1, nextBatch) + sink.addBatch(currentBatchId, nextBatch) awaitBatchLock.lock() try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index a34927ff99..bcc33ae8c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -124,6 +124,10 @@ class MemorySink(val schema: StructType) extends Sink with Logging { batches.flatten } + def latestBatchId: Option[Int] = synchronized { + if (batches.size == 0) None else Some(batches.size - 1) + } + def lastBatch: Seq[Row] = synchronized { batches.last } def toDebugString: String = synchronized { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6a8b280174..013b731693 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with SharedSQLContext { } } - // This would fail for now -- error is "Timed out waiting for stream" - // Root cause is that data generated in batch 0 may not get processed in batch 1 - // Let's enable this after SPARK-14942: Reduce delay between batch construction and execution - ignore("minimize delay between batch construction and execution") { + test("minimize delay between batch construction and execution") { + + // For each batch, we would retrieve new data's offsets and log them before we run the execution + // This checks whether the key of the offset log is the expected batch id + def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery = + AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId, + s"offsetLog's latest should be $expectedId") + + // For each batch, we would log the state change during the execution + // This checks whether the key of the state change log is the expected batch id + def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery = + AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId == expectedId, + s"lastExecution's currentBatchId should be $expectedId") + + // For each batch, we would log the sink change after the execution + // This checks whether the key of the sink change log is the expected batch id + def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery = + AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == expectedId, + s"sink's lastBatchId should be $expectedId") + val inputData = MemoryStream[Int] testStream(inputData.toDS())( StartStream(ProcessingTime("10 seconds"), new ManualClock), + /* -- batch 0 ----------------------- */ - AddData(inputData, 1), - AddData(inputData, 2), - AddData(inputData, 3), + // Add some data in batch 0 + AddData(inputData, 1, 2, 3), AdvanceManualClock(10 * 1000), // 10 seconds + /* -- batch 1 ----------------------- */ - CheckAnswer(1, 2, 3)) + // Check the results of batch 0 + CheckAnswer(1, 2, 3), + CheckIncrementalExecutionCurrentBatchId(0), + CheckOffsetLogLatestBatchId(0), + CheckSinkLatestBatchId(0), + // Add some data in batch 1 + AddData(inputData, 4, 5, 6), + AdvanceManualClock(10 * 1000), + + /* -- batch _ ----------------------- */ + // Check the results of batch 1 + CheckAnswer(1, 2, 3, 4, 5, 6), + CheckIncrementalExecutionCurrentBatchId(1), + CheckOffsetLogLatestBatchId(1), + CheckSinkLatestBatchId(1), + + AdvanceManualClock(10 * 1000), + AdvanceManualClock(10 * 1000), + AdvanceManualClock(10 * 1000), + + /* -- batch __ ---------------------- */ + // Check the results of batch 1 again; this is to make sure that, when there's no new data, + // the currentId does not get logged (e.g. as 2) even if the clock has advanced many times + CheckAnswer(1, 2, 3, 4, 5, 6), + CheckIncrementalExecutionCurrentBatchId(1), + CheckOffsetLogLatestBatchId(1), + CheckSinkLatestBatchId(1), + + /* Stop then restart the Stream */ + StopStream, + StartStream(ProcessingTime("10 seconds"), new ManualClock), + + /* -- batch 1 rerun ----------------- */ + // this batch 1 would re-run because the latest batch id logged in offset log is 1 + AdvanceManualClock(10 * 1000), + + /* -- batch 2 ----------------------- */ + // Check the results of batch 1 + CheckAnswer(1, 2, 3, 4, 5, 6), + CheckIncrementalExecutionCurrentBatchId(1), + CheckOffsetLogLatestBatchId(1), + CheckSinkLatestBatchId(1), + // Add some data in batch 2 + AddData(inputData, 7, 8, 9), + AdvanceManualClock(10 * 1000), + + /* -- batch 3 ----------------------- */ + // Check the results of batch 2 + CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9), + CheckIncrementalExecutionCurrentBatchId(2), + CheckOffsetLogLatestBatchId(2), + CheckSinkLatestBatchId(2)) } } -- cgit v1.2.3