aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala40
1 files changed, 30 insertions, 10 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index aa6515bc7a..09140a1d6e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -50,11 +50,11 @@ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
*
* {{{
* val inputData = MemoryStream[Int]
- val mapped = inputData.toDS().map(_ + 1)
-
- testStream(mapped)(
- AddData(inputData, 1, 2, 3),
- CheckAnswer(2, 3, 4))
+ * val mapped = inputData.toDS().map(_ + 1)
+ *
+ * testStream(mapped)(
+ * AddData(inputData, 1, 2, 3),
+ * CheckAnswer(2, 3, 4))
* }}}
*
* Note that while we do sleep to allow the other thread to progress without spinning,
@@ -477,21 +477,41 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
}
+
+ /**
+ * Creates a stress test that randomly starts/stops/adds data/checks the result.
+ *
+ * @param ds a dataframe that executes + 1 on a stream of integers, returning the result
+ * @param addData an add data action that adds the given numbers to the stream, encoding them
+ * as needed
+ * @param iterations the iteration number
+ */
+ def runStressTest(
+ ds: Dataset[Int],
+ addData: Seq[Int] => StreamAction,
+ iterations: Int = 100): Unit = {
+ runStressTest(ds, Seq.empty, (data, running) => addData(data), iterations)
+ }
+
/**
* Creates a stress test that randomly starts/stops/adds data/checks the result.
*
- * @param ds a dataframe that executes + 1 on a stream of integers, returning the result.
- * @param addData and add data action that adds the given numbers to the stream, encoding them
+ * @param ds a dataframe that executes + 1 on a stream of integers, returning the result
+ * @param prepareActions actions need to run before starting the stress test.
+ * @param addData an add data action that adds the given numbers to the stream, encoding them
* as needed
+ * @param iterations the iteration number
*/
def runStressTest(
ds: Dataset[Int],
- addData: Seq[Int] => StreamAction,
- iterations: Int = 100): Unit = {
+ prepareActions: Seq[StreamAction],
+ addData: (Seq[Int], Boolean) => StreamAction,
+ iterations: Int): Unit = {
implicit val intEncoder = ExpressionEncoder[Int]()
var dataPos = 0
var running = true
val actions = new ArrayBuffer[StreamAction]()
+ actions ++= prepareActions
def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) }
@@ -499,7 +519,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
val numItems = Random.nextInt(10)
val data = dataPos until (dataPos + numItems)
dataPos += numItems
- actions += addData(data)
+ actions += addData(data, running)
}
(1 to iterations).foreach { i =>