aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-05-04 10:25:14 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-05-04 10:25:14 -0700
commite597ec6f1c8ba1f9c10de06534bda1862b0c59aa (patch)
tree9091d4d2244dde01cb54bd146dbb3fa99d706c70 /sql
parenta45647746d1efb90cb8bc142c2ef110a0db9bc9f (diff)
downloadspark-e597ec6f1c8ba1f9c10de06534bda1862b0c59aa.tar.gz
spark-e597ec6f1c8ba1f9c10de06534bda1862b0c59aa.tar.bz2
spark-e597ec6f1c8ba1f9c10de06534bda1862b0c59aa.zip
[SPARK-15022][SPARK-15023][SQL][STREAMING] Add support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `ManualClock`
## What changes were proposed in this pull request? Currently in `StreamTest`, we have a `StartStream` which will start a streaming query against trigger `ProcessTime(intervalMS = 0)` and `SystemClock`. We also need to test cases against `ProcessTime(intervalMS > 0)`, which often requires `ManualClock`. This patch: - fixes an issue of `ProcessingTimeExecutor`, where for a batch it should run `batchRunner` only once but might run multiple times under certain conditions; - adds support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `AdvanceManualClock`, by specifying them as fields for `StartStream`, and by adding an `AdvanceClock` action; - adds a test, which takes advantage of the new `StartStream` and `AdvanceManualClock`, to test against [PR#[SPARK-14942] Reduce delay between batch construction and execution ](https://github.com/apache/spark/pull/12725). ## How was this patch tested? N/A Author: Liwei Lin <lwlin7@gmail.com> Closes #12797 from lw-lin/add-trigger-test-support.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala33
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala24
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala6
10 files changed, 89 insertions, 34 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index f82130cfa8..eab557443d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.ContinuousQueryListener
+import org.apache.spark.util.{Clock, SystemClock}
/**
* :: Experimental ::
@@ -175,6 +176,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
df: DataFrame,
sink: Sink,
trigger: Trigger = ProcessingTime(0),
+ triggerClock: Clock = new SystemClock(),
outputMode: OutputMode = Append): ContinuousQuery = {
activeQueriesLock.synchronized {
if (activeQueries.contains(name)) {
@@ -206,8 +208,9 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
checkpointLocation,
logicalPlan,
sink,
- outputMode,
- trigger)
+ trigger,
+ triggerClock,
+ outputMode)
query.start()
activeQueries.put(name, query)
query
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3c5ced2af7..ea367b699f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.ContinuousQueryListener._
-import org.apache.spark.util.{UninterruptibleThread, Utils}
+import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
@@ -50,8 +50,9 @@ class StreamExecution(
checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
val sink: Sink,
- val outputMode: OutputMode,
- val trigger: Trigger)
+ val trigger: Trigger,
+ private[sql] val triggerClock: Clock,
+ val outputMode: OutputMode)
extends ContinuousQuery with Logging {
/**
@@ -88,7 +89,7 @@ class StreamExecution(
private val uniqueSources = sources.distinct
private val triggerExecutor = trigger match {
- case t: ProcessingTime => ProcessingTimeExecutor(t)
+ case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
}
/** Defines the internal state of execution */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index a1132d5106..569907b369 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -65,8 +65,13 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock =
s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds")
}
- /** Return the next multiple of intervalMs */
+ /**
+ * Returns the start time in milliseconds for the next batch interval, given the current time.
+ * Note that a batch interval is inclusive with respect to its start time, and thus calling
+ * `nextBatchTime` with the result of a previous call should return the next interval. (i.e. given
+ * an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`).
+ */
def nextBatchTime(now: Long): Long = {
- (now - 1) / intervalMs * intervalMs + intervalMs
+ now / intervalMs * intervalMs + intervalMs
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index dff6acc94b..6fb1aca769 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
/**
* A framework for implementing tests for streaming queries and sources.
@@ -138,11 +138,17 @@ trait StreamTest extends QueryTest with Timeouts {
private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
}
- /** Stops the stream. It must currently be running. */
+ /** Stops the stream. It must currently be running. */
case object StopStream extends StreamAction with StreamMustBeRunning
- /** Starts the stream, resuming if data has already been processed. It must not be running. */
- case object StartStream extends StreamAction
+ /** Starts the stream, resuming if data has already been processed. It must not be running. */
+ case class StartStream(
+ trigger: Trigger = ProcessingTime(0),
+ triggerClock: Clock = new SystemClock)
+ extends StreamAction
+
+ /** Advance the trigger clock's time manually. */
+ case class AdvanceManualClock(timeToAdd: Long) extends StreamAction
/** Signals that a failure is expected and should not kill the test. */
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
@@ -199,8 +205,8 @@ trait StreamTest extends QueryTest with Timeouts {
// If the test doesn't manually start the stream, we do it automatically at the beginning.
val startedManually =
- actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
- val startedTest = if (startedManually) actions else StartStream +: actions
+ actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream])
+ val startedTest = if (startedManually) actions else StartStream() +: actions
def testActions = actions.zipWithIndex.map {
case (a, i) =>
@@ -280,7 +286,7 @@ trait StreamTest extends QueryTest with Timeouts {
try {
startedTest.foreach { action =>
action match {
- case StartStream =>
+ case StartStream(trigger, triggerClock) =>
verify(currentStream == null, "stream already running")
lastStream = currentStream
currentStream =
@@ -291,6 +297,8 @@ trait StreamTest extends QueryTest with Timeouts {
metadataRoot,
stream,
sink,
+ trigger,
+ triggerClock,
outputMode = outputMode)
.asInstanceOf[StreamExecution]
currentStream.microBatchThread.setUncaughtExceptionHandler(
@@ -301,6 +309,13 @@ trait StreamTest extends QueryTest with Timeouts {
}
})
+ case AdvanceManualClock(timeToAdd) =>
+ verify(currentStream != null,
+ "can not advance manual clock when a stream is not running")
+ verify(currentStream.triggerClock.isInstanceOf[ManualClock],
+ s"can not advance clock of type ${currentStream.triggerClock.getClass}")
+ currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
+
case StopStream =>
verify(currentStream != null, "can not stop a stream that is not running")
try failAfter(streamingTimeout) {
@@ -470,7 +485,7 @@ trait StreamTest extends QueryTest with Timeouts {
addRandomData()
case _ => // StartStream
- actions += StartStream
+ actions += StartStream()
running = true
}
} else {
@@ -488,7 +503,7 @@ trait StreamTest extends QueryTest with Timeouts {
}
}
}
- if(!running) { actions += StartStream }
+ if(!running) { actions += StartStream() }
addCheck()
testStream(ds)(actions: _*)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
index dd5f92248b..7f99d303ba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
@@ -21,19 +21,34 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.ProcessingTime
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{Clock, ManualClock, SystemClock}
class ProcessingTimeExecutorSuite extends SparkFunSuite {
test("nextBatchTime") {
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
+ assert(processingTimeExecutor.nextBatchTime(0) === 100)
assert(processingTimeExecutor.nextBatchTime(1) === 100)
assert(processingTimeExecutor.nextBatchTime(99) === 100)
- assert(processingTimeExecutor.nextBatchTime(100) === 100)
+ assert(processingTimeExecutor.nextBatchTime(100) === 200)
assert(processingTimeExecutor.nextBatchTime(101) === 200)
assert(processingTimeExecutor.nextBatchTime(150) === 200)
}
+ test("calling nextBatchTime with the result of a previous call should return the next interval") {
+ val intervalMS = 100
+ val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS))
+
+ val ITERATION = 10
+ var nextBatchTime: Long = 0
+ for (it <- 1 to ITERATION) {
+ nextBatchTime = processingTimeExecutor.nextBatchTime(nextBatchTime)
+ }
+
+ // nextBatchTime should be 1000
+ assert(nextBatchTime === intervalMS * ITERATION)
+ }
+
private def testBatchTermination(intervalMs: Long): Unit = {
var batchCounts = 0
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index 3be0ea481d..f469cde6be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -45,7 +45,7 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
TestAwaitTermination(ExpectNotBlocked),
TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, expectedReturnValue = true),
TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = true),
- StartStream,
+ StartStream(),
AssertOnQuery(_.isActive === true),
AddData(inputData, 0),
ExpectFailure[SparkException],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6b1ecd08c1..bc5c0c1f69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -268,7 +268,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
- StartStream,
+ StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
@@ -292,7 +292,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
"{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
src,
tmp),
- StartStream,
+ StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData(
"{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
@@ -385,7 +385,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
- StartStream,
+ StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
@@ -449,7 +449,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
- StartStream,
+ StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6f3149dbc5..bcd3cba55a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql.streaming
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.util.ManualClock
class StreamSuite extends StreamTest with SharedSQLContext {
@@ -34,11 +34,11 @@ class StreamSuite extends StreamTest with SharedSQLContext {
testStream(mapped)(
AddData(inputData, 1, 2, 3),
- StartStream,
+ StartStream(),
CheckAnswer(2, 3, 4),
StopStream,
AddData(inputData, 4, 5, 6),
- StartStream,
+ StartStream(),
CheckAnswer(2, 3, 4, 5, 6, 7))
}
@@ -70,7 +70,7 @@ class StreamSuite extends StreamTest with SharedSQLContext {
CheckAnswer(1, 2, 3, 4, 5, 6),
StopStream,
AddData(inputData1, 7),
- StartStream,
+ StartStream(),
AddData(inputData2, 8),
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8))
}
@@ -136,6 +136,22 @@ class StreamSuite extends StreamTest with SharedSQLContext {
testStream(ds)()
}
}
+
+ // This would fail for now -- error is "Timed out waiting for stream"
+ // Root cause is that data generated in batch 0 may not get processed in batch 1
+ // Let's enable this after SPARK-14942: Reduce delay between batch construction and execution
+ ignore("minimize delay between batch construction and execution") {
+ val inputData = MemoryStream[Int]
+ testStream(inputData.toDS())(
+ StartStream(ProcessingTime("10 seconds"), new ManualClock),
+ /* -- batch 0 ----------------------- */
+ AddData(inputData, 1),
+ AddData(inputData, 2),
+ AddData(inputData, 3),
+ AdvanceManualClock(10 * 1000), // 10 seconds
+ /* -- batch 1 ----------------------- */
+ CheckAnswer(1, 2, 3))
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index fa3b122f6d..bdf40f5cd4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -50,7 +50,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
AddData(inputData, 3, 2),
CheckLastBatch((3, 2), (2, 1)),
StopStream,
- StartStream,
+ StartStream(),
AddData(inputData, 3, 2, 1),
CheckLastBatch((3, 3), (2, 2), (1, 1)),
// By default we run in new tuple mode.
@@ -113,10 +113,10 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
.as[(Int, Long)]
testStream(aggregated)(
- StartStream,
+ StartStream(),
AddData(inputData, 1, 2, 3, 4),
ExpectFailure[SparkException](),
- StartStream,
+ StartStream(),
CheckLastBatch((1, 1), (2, 1), (3, 1), (4, 1))
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
index 2596231a12..54acd4db3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
@@ -48,7 +48,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
val input = MemoryStream[Int]
withListenerAdded(listener) {
testStream(input.toDS)(
- StartStream,
+ StartStream(),
Assert("Incorrect query status in onQueryStarted") {
val status = listener.startStatus
assert(status != null)
@@ -102,7 +102,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
def isListenerActive(listener: QueryStatusCollector): Boolean = {
listener.reset()
testStream(MemoryStream[Int].toDS)(
- StartStream,
+ StartStream(),
StopStream
)
listener.startStatus != null
@@ -133,7 +133,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
listener.reset()
require(listener.startStatus === null)
testStream(MemoryStream[Int].toDS)(
- StartStream,
+ StartStream(),
Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
StopStream,
Assert { listener.checkAsyncErrors() }