diff options
author | Liwei Lin <lwlin7@gmail.com> | 2016-05-04 10:25:14 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-05-04 10:25:14 -0700 |
commit | e597ec6f1c8ba1f9c10de06534bda1862b0c59aa (patch) | |
tree | 9091d4d2244dde01cb54bd146dbb3fa99d706c70 /sql | |
parent | a45647746d1efb90cb8bc142c2ef110a0db9bc9f (diff) | |
download | spark-e597ec6f1c8ba1f9c10de06534bda1862b0c59aa.tar.gz spark-e597ec6f1c8ba1f9c10de06534bda1862b0c59aa.tar.bz2 spark-e597ec6f1c8ba1f9c10de06534bda1862b0c59aa.zip |
[SPARK-15022][SPARK-15023][SQL][STREAMING] Add support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `ManualClock`
## What changes were proposed in this pull request?
Currently in `StreamTest`, we have a `StartStream` which will start a streaming query against trigger `ProcessTime(intervalMS = 0)` and `SystemClock`.
We also need to test cases against `ProcessTime(intervalMS > 0)`, which often requires `ManualClock`.
This patch:
- fixes an issue of `ProcessingTimeExecutor`, where for a batch it should run `batchRunner` only once but might run multiple times under certain conditions;
- adds support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `AdvanceManualClock`, by specifying them as fields for `StartStream`, and by adding an `AdvanceClock` action;
- adds a test, which takes advantage of the new `StartStream` and `AdvanceManualClock`, to test against [PR#[SPARK-14942] Reduce delay between batch construction and execution ](https://github.com/apache/spark/pull/12725).
## How was this patch tested?
N/A
Author: Liwei Lin <lwlin7@gmail.com>
Closes #12797 from lw-lin/add-trigger-test-support.
Diffstat (limited to 'sql')
10 files changed, 89 insertions, 34 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index f82130cfa8..eab557443d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.ContinuousQueryListener +import org.apache.spark.util.{Clock, SystemClock} /** * :: Experimental :: @@ -175,6 +176,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) { df: DataFrame, sink: Sink, trigger: Trigger = ProcessingTime(0), + triggerClock: Clock = new SystemClock(), outputMode: OutputMode = Append): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { @@ -206,8 +208,9 @@ class ContinuousQueryManager(sparkSession: SparkSession) { checkpointLocation, logicalPlan, sink, - outputMode, - trigger) + trigger, + triggerClock, + outputMode) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3c5ced2af7..ea367b699f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.ContinuousQueryListener import org.apache.spark.sql.util.ContinuousQueryListener._ -import org.apache.spark.util.{UninterruptibleThread, Utils} +import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. @@ -50,8 +50,9 @@ class StreamExecution( checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, val sink: Sink, - val outputMode: OutputMode, - val trigger: Trigger) + val trigger: Trigger, + private[sql] val triggerClock: Clock, + val outputMode: OutputMode) extends ContinuousQuery with Logging { /** @@ -88,7 +89,7 @@ class StreamExecution( private val uniqueSources = sources.distinct private val triggerExecutor = trigger match { - case t: ProcessingTime => ProcessingTimeExecutor(t) + case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) } /** Defines the internal state of execution */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index a1132d5106..569907b369 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -65,8 +65,13 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds") } - /** Return the next multiple of intervalMs */ + /** + * Returns the start time in milliseconds for the next batch interval, given the current time. + * Note that a batch interval is inclusive with respect to its start time, and thus calling + * `nextBatchTime` with the result of a previous call should return the next interval. (i.e. given + * an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`). + */ def nextBatchTime(now: Long): Long = { - (now - 1) / intervalMs * intervalMs + intervalMs + now / intervalMs * intervalMs + intervalMs } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index dff6acc94b..6fb1aca769 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} /** * A framework for implementing tests for streaming queries and sources. @@ -138,11 +138,17 @@ trait StreamTest extends QueryTest with Timeouts { private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer" } - /** Stops the stream. It must currently be running. */ + /** Stops the stream. It must currently be running. */ case object StopStream extends StreamAction with StreamMustBeRunning - /** Starts the stream, resuming if data has already been processed. It must not be running. */ - case object StartStream extends StreamAction + /** Starts the stream, resuming if data has already been processed. It must not be running. */ + case class StartStream( + trigger: Trigger = ProcessingTime(0), + triggerClock: Clock = new SystemClock) + extends StreamAction + + /** Advance the trigger clock's time manually. */ + case class AdvanceManualClock(timeToAdd: Long) extends StreamAction /** Signals that a failure is expected and should not kill the test. */ case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { @@ -199,8 +205,8 @@ trait StreamTest extends QueryTest with Timeouts { // If the test doesn't manually start the stream, we do it automatically at the beginning. val startedManually = - actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).contains(StartStream) - val startedTest = if (startedManually) actions else StartStream +: actions + actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream]) + val startedTest = if (startedManually) actions else StartStream() +: actions def testActions = actions.zipWithIndex.map { case (a, i) => @@ -280,7 +286,7 @@ trait StreamTest extends QueryTest with Timeouts { try { startedTest.foreach { action => action match { - case StartStream => + case StartStream(trigger, triggerClock) => verify(currentStream == null, "stream already running") lastStream = currentStream currentStream = @@ -291,6 +297,8 @@ trait StreamTest extends QueryTest with Timeouts { metadataRoot, stream, sink, + trigger, + triggerClock, outputMode = outputMode) .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( @@ -301,6 +309,13 @@ trait StreamTest extends QueryTest with Timeouts { } }) + case AdvanceManualClock(timeToAdd) => + verify(currentStream != null, + "can not advance manual clock when a stream is not running") + verify(currentStream.triggerClock.isInstanceOf[ManualClock], + s"can not advance clock of type ${currentStream.triggerClock.getClass}") + currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) + case StopStream => verify(currentStream != null, "can not stop a stream that is not running") try failAfter(streamingTimeout) { @@ -470,7 +485,7 @@ trait StreamTest extends QueryTest with Timeouts { addRandomData() case _ => // StartStream - actions += StartStream + actions += StartStream() running = true } } else { @@ -488,7 +503,7 @@ trait StreamTest extends QueryTest with Timeouts { } } } - if(!running) { actions += StartStream } + if(!running) { actions += StartStream() } addCheck() testStream(ds)(actions: _*) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index dd5f92248b..7f99d303ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -21,19 +21,34 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.ProcessingTime -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{Clock, ManualClock, SystemClock} class ProcessingTimeExecutorSuite extends SparkFunSuite { test("nextBatchTime") { val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100)) + assert(processingTimeExecutor.nextBatchTime(0) === 100) assert(processingTimeExecutor.nextBatchTime(1) === 100) assert(processingTimeExecutor.nextBatchTime(99) === 100) - assert(processingTimeExecutor.nextBatchTime(100) === 100) + assert(processingTimeExecutor.nextBatchTime(100) === 200) assert(processingTimeExecutor.nextBatchTime(101) === 200) assert(processingTimeExecutor.nextBatchTime(150) === 200) } + test("calling nextBatchTime with the result of a previous call should return the next interval") { + val intervalMS = 100 + val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS)) + + val ITERATION = 10 + var nextBatchTime: Long = 0 + for (it <- 1 to ITERATION) { + nextBatchTime = processingTimeExecutor.nextBatchTime(nextBatchTime) + } + + // nextBatchTime should be 1000 + assert(nextBatchTime === intervalMS * ITERATION) + } + private def testBatchTermination(intervalMs: Long): Unit = { var batchCounts = 0 val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index 3be0ea481d..f469cde6be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -45,7 +45,7 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext { TestAwaitTermination(ExpectNotBlocked), TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, expectedReturnValue = true), TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = true), - StartStream, + StartStream(), AssertOnQuery(_.isActive === true), AddData(inputData, 0), ExpectFailure[SparkException], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6b1ecd08c1..bc5c0c1f69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -268,7 +268,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { CheckAnswer("keep2", "keep3"), StopStream, AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), - StartStream, + StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") @@ -292,7 +292,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}", src, tmp), - StartStream, + StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), AddTextFileData( "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}", @@ -385,7 +385,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { CheckAnswer("keep2", "keep3"), StopStream, AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp), - StartStream, + StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") @@ -449,7 +449,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { CheckAnswer("keep2", "keep3"), StopStream, AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), - StartStream, + StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6f3149dbc5..bcd3cba55a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.streaming import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.util.ManualClock class StreamSuite extends StreamTest with SharedSQLContext { @@ -34,11 +34,11 @@ class StreamSuite extends StreamTest with SharedSQLContext { testStream(mapped)( AddData(inputData, 1, 2, 3), - StartStream, + StartStream(), CheckAnswer(2, 3, 4), StopStream, AddData(inputData, 4, 5, 6), - StartStream, + StartStream(), CheckAnswer(2, 3, 4, 5, 6, 7)) } @@ -70,7 +70,7 @@ class StreamSuite extends StreamTest with SharedSQLContext { CheckAnswer(1, 2, 3, 4, 5, 6), StopStream, AddData(inputData1, 7), - StartStream, + StartStream(), AddData(inputData2, 8), CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8)) } @@ -136,6 +136,22 @@ class StreamSuite extends StreamTest with SharedSQLContext { testStream(ds)() } } + + // This would fail for now -- error is "Timed out waiting for stream" + // Root cause is that data generated in batch 0 may not get processed in batch 1 + // Let's enable this after SPARK-14942: Reduce delay between batch construction and execution + ignore("minimize delay between batch construction and execution") { + val inputData = MemoryStream[Int] + testStream(inputData.toDS())( + StartStream(ProcessingTime("10 seconds"), new ManualClock), + /* -- batch 0 ----------------------- */ + AddData(inputData, 1), + AddData(inputData, 2), + AddData(inputData, 3), + AdvanceManualClock(10 * 1000), // 10 seconds + /* -- batch 1 ----------------------- */ + CheckAnswer(1, 2, 3)) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index fa3b122f6d..bdf40f5cd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -50,7 +50,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext { AddData(inputData, 3, 2), CheckLastBatch((3, 2), (2, 1)), StopStream, - StartStream, + StartStream(), AddData(inputData, 3, 2, 1), CheckLastBatch((3, 3), (2, 2), (1, 1)), // By default we run in new tuple mode. @@ -113,10 +113,10 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext { .as[(Int, Long)] testStream(aggregated)( - StartStream, + StartStream(), AddData(inputData, 1, 2, 3, 4), ExpectFailure[SparkException](), - StartStream, + StartStream(), CheckLastBatch((1, 1), (2, 1), (3, 1), (4, 1)) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala index 2596231a12..54acd4db3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala @@ -48,7 +48,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with val input = MemoryStream[Int] withListenerAdded(listener) { testStream(input.toDS)( - StartStream, + StartStream(), Assert("Incorrect query status in onQueryStarted") { val status = listener.startStatus assert(status != null) @@ -102,7 +102,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with def isListenerActive(listener: QueryStatusCollector): Boolean = { listener.reset() testStream(MemoryStream[Int].toDS)( - StartStream, + StartStream(), StopStream ) listener.startStatus != null @@ -133,7 +133,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with listener.reset() require(listener.startStatus === null) testStream(MemoryStream[Int].toDS)( - StartStream, + StartStream(), Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"), StopStream, Assert { listener.checkAsyncErrors() } |