aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-11-29 17:24:17 -0800
committerMichael Armbrust <michael@databricks.com>2016-11-29 17:24:17 -0800
commitc3d08e2f29baeebe09bf4c059ace4336af9116b5 (patch)
treebc677be4760fdd9dfabcb5bb990c576fd5c65e12 /sql/core/src/test/scala
parent9a02f6821265ff67ba3f7b095cd1afaebd25a898 (diff)
downloadspark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.gz
spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.bz2
spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.zip
[SPARK-18516][SQL] Split state and progress in streaming
This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #15954 from marmbrus/queryProgress.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala213
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala267
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala98
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala123
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala260
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala16
9 files changed, 412 insertions, 650 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
deleted file mode 100644
index 38c4ece439..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.scalactic.TolerantNumerics
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.util.ManualClock
-
-class StreamMetricsSuite extends SparkFunSuite {
- import StreamMetrics._
-
- // To make === between double tolerate inexact values
- implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
-
- test("rates, latencies, trigger details - basic life cycle") {
- val sm = newStreamMetrics(source)
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 0.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 0.0)
- assert(sm.currentLatency() === None)
- assert(sm.currentTriggerDetails().isEmpty)
-
- // When trigger started, the rates should not change, but should return
- // reported trigger details
- sm.reportTriggerStarted(1)
- sm.reportTriggerDetail("key", "value")
- sm.reportSourceTriggerDetail(source, "key2", "value2")
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 0.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 0.0)
- assert(sm.currentLatency() === None)
- assert(sm.currentTriggerDetails() ===
- Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
- START_TIMESTAMP -> "0", "key" -> "value"))
- assert(sm.currentSourceTriggerDetails(source) ===
- Map(BATCH_ID -> "1", "key2" -> "value2"))
-
- // Finishing the trigger should calculate the rates, except input rate which needs
- // to have another trigger interval
- sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows, 10 output rows
- clock.advance(1000)
- sm.reportTriggerFinished()
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 100.0) // 100 input rows processed in 1 sec
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 100.0)
- assert(sm.currentLatency() === None)
- assert(sm.currentTriggerDetails() ===
- Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
- START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
- NUM_INPUT_ROWS -> "100", "key" -> "value"))
- assert(sm.currentSourceTriggerDetails(source) ===
- Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
-
- // After another trigger starts, the rates and latencies should not change until
- // new rows are reported
- clock.advance(1000)
- sm.reportTriggerStarted(2)
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 100.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 100.0)
- assert(sm.currentLatency() === None)
-
- // Reporting new rows should update the rates and latencies
- sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
- clock.advance(500)
- sm.reportTriggerFinished()
- assert(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts
- assert(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec
- assert(sm.currentSourceInputRate(source) === 100.0)
- assert(sm.currentSourceProcessingRate(source) === 400.0)
- assert(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
-
- // Rates should be set to 0 after stop
- sm.stop()
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 0.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 0.0)
- assert(sm.currentLatency() === None)
- assert(sm.currentTriggerDetails().isEmpty)
- }
-
- test("rates and latencies - after trigger with no data") {
- val sm = newStreamMetrics(source)
- // Trigger 1 with data
- sm.reportTriggerStarted(1)
- sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows
- clock.advance(1000)
- sm.reportTriggerFinished()
-
- // Trigger 2 with data
- clock.advance(1000)
- sm.reportTriggerStarted(2)
- sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
- clock.advance(500)
- sm.reportTriggerFinished()
-
- // Make sure that all rates are set
- require(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts
- require(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec
- require(sm.currentSourceInputRate(source) === 100.0)
- require(sm.currentSourceProcessingRate(source) === 400.0)
- require(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
-
- // Trigger 3 with data
- clock.advance(500)
- sm.reportTriggerStarted(3)
- clock.advance(500)
- sm.reportTriggerFinished()
-
- // Rates are set to zero and latency is set to None
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 0.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 0.0)
- assert(sm.currentLatency() === None)
- sm.stop()
- }
-
- test("rates - after trigger with multiple sources, and one source having no info") {
- val source1 = TestSource(1)
- val source2 = TestSource(2)
- val sm = newStreamMetrics(source1, source2)
- // Trigger 1 with data
- sm.reportTriggerStarted(1)
- sm.reportNumInputRows(Map(source1 -> 100L, source2 -> 100L))
- clock.advance(1000)
- sm.reportTriggerFinished()
-
- // Trigger 2 with data
- clock.advance(1000)
- sm.reportTriggerStarted(2)
- sm.reportNumInputRows(Map(source1 -> 200L, source2 -> 200L))
- clock.advance(500)
- sm.reportTriggerFinished()
-
- // Make sure that all rates are set
- assert(sm.currentInputRate() === 200.0) // 200*2 input rows generated in 2 seconds b/w starts
- assert(sm.currentProcessingRate() === 800.0) // 200*2 output rows processed in 0.5 sec
- assert(sm.currentSourceInputRate(source1) === 100.0)
- assert(sm.currentSourceInputRate(source2) === 100.0)
- assert(sm.currentSourceProcessingRate(source1) === 400.0)
- assert(sm.currentSourceProcessingRate(source2) === 400.0)
-
- // Trigger 3 with data
- clock.advance(500)
- sm.reportTriggerStarted(3)
- clock.advance(500)
- sm.reportNumInputRows(Map(source1 -> 200L))
- sm.reportTriggerFinished()
-
- // Rates are set to zero and latency is set to None
- assert(sm.currentInputRate() === 200.0)
- assert(sm.currentProcessingRate() === 400.0)
- assert(sm.currentSourceInputRate(source1) === 200.0)
- assert(sm.currentSourceInputRate(source2) === 0.0)
- assert(sm.currentSourceProcessingRate(source1) === 400.0)
- assert(sm.currentSourceProcessingRate(source2) === 0.0)
- sm.stop()
- }
-
- test("registered Codahale metrics") {
- import scala.collection.JavaConverters._
- val sm = newStreamMetrics(source)
- val gaugeNames = sm.metricRegistry.getGauges().keySet().asScala
-
- // so that all metrics are considered as a single metric group in Ganglia
- assert(!gaugeNames.exists(_.contains(".")))
- assert(gaugeNames === Set(
- "inputRate-total",
- "inputRate-source0",
- "processingRate-total",
- "processingRate-source0",
- "latency"))
- }
-
- private def newStreamMetrics(sources: Source*): StreamMetrics = {
- new StreamMetrics(sources.toSet, clock, "test")
- }
-
- private val clock = new ManualClock()
- private val source = TestSource(0)
-
- case class TestSource(id: Int) extends Source {
- override def schema: StructType = StructType(Array.empty[StructField])
- override def getOffset: Option[Offset] = Some(new LongOffset(0))
- override def getBatch(start: Option[Offset], end: Offset): DataFrame = { null }
- override def stop() {}
- override def toString(): String = s"source$id"
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bad6642ea4..8256c63d87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1006,9 +1006,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
testStream(input)(
AddTextFileData("100", src, tmp),
CheckAnswer("100"),
- AssertOnLastQueryStatus { status =>
- assert(status.triggerDetails.get("numRows.input.total") === "1")
- assert(status.sourceStatuses(0).processingRate > 0.0)
+ AssertOnQuery { query =>
+ val actualProgress = query.recentProgresses
+ .find(_.numInputRows > 0)
+ .getOrElse(sys.error("Could not find records with data."))
+ assert(actualProgress.numInputRows === 1)
+ assert(actualProgress.sources(0).processedRowsPerSecond > 0.0)
+ true
}
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a6b2d4b9ab..a2629f7f68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -28,7 +28,6 @@ import scala.util.control.NonFatal
import org.scalatest.Assertions
import org.scalatest.concurrent.{Eventually, Timeouts}
-import org.scalatest.concurrent.AsyncAssertions.Waiter
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.exceptions.TestFailedDueToTimeoutException
@@ -202,10 +201,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
}
- case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
- extends StreamAction
-
- class StreamManualClock(time: Long = 0L) extends ManualClock(time) {
+ class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
private var waitStartTime: Option[Long] = None
override def waitTillTime(targetTime: Long): Long = synchronized {
@@ -325,10 +321,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
val testThread = Thread.currentThread()
val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
- val statusCollector = new QueryStatusCollector
var manualClockExpectedTime = -1L
try {
- spark.streams.addListener(statusCollector)
startedTest.foreach { action =>
logInfo(s"Processing test stream action: $action")
action match {
@@ -375,10 +369,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
assert(manualClockExpectedTime >= 0)
+
// Make sure we don't advance ManualClock too early. See SPARK-16002.
eventually("StreamManualClock has not yet entered the waiting state") {
assert(clock.isStreamWaitingAt(manualClockExpectedTime))
}
+
clock.advance(timeToAdd)
manualClockExpectedTime += timeToAdd
verify(clock.getTimeMillis() === manualClockExpectedTime,
@@ -447,13 +443,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
val streamToAssert = Option(currentStream).getOrElse(lastStream)
verify({ a.run(); true }, s"Assert failed: ${a.message}")
- case a: AssertOnLastQueryStatus =>
- Eventually.eventually(timeout(streamingTimeout)) {
- require(statusCollector.lastTriggerStatus.nonEmpty)
- }
- val status = statusCollector.lastTriggerStatus.get
- verify({ a.condition(status); true }, "Assert on last query status failed")
-
case a: AddData =>
try {
// Add data and get the source where it was added, and the expected offset of the
@@ -528,7 +517,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
if (currentStream != null && currentStream.microBatchThread.isAlive) {
currentStream.stop()
}
- spark.streams.removeListener(statusCollector)
// Rollback prev configuration values
resetConfValues.foreach {
@@ -614,7 +602,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
testStream(ds)(actions: _*)
}
-
object AwaitTerminationTester {
trait ExpectedBehavior
@@ -668,58 +655,4 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
}
}
-
-
- class QueryStatusCollector extends StreamingQueryListener {
- // to catch errors in the async listener events
- @volatile private var asyncTestWaiter = new Waiter
-
- @volatile var startStatus: StreamingQueryStatus = null
- @volatile var terminationStatus: StreamingQueryStatus = null
- @volatile var terminationException: Option[String] = null
-
- private val progressStatuses = new mutable.ArrayBuffer[StreamingQueryStatus]
-
- /** Get the info of the last trigger that processed data */
- def lastTriggerStatus: Option[StreamingQueryStatus] = synchronized {
- progressStatuses.filter { i =>
- i.triggerDetails.get("isTriggerActive").toBoolean == false &&
- i.triggerDetails.get("isDataPresentInTrigger").toBoolean == true
- }.lastOption
- }
-
- def reset(): Unit = {
- startStatus = null
- terminationStatus = null
- progressStatuses.clear()
- asyncTestWaiter = new Waiter
- }
-
- def checkAsyncErrors(): Unit = {
- asyncTestWaiter.await(timeout(10 seconds))
- }
-
-
- override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
- asyncTestWaiter {
- startStatus = queryStarted.queryStatus
- }
- }
-
- override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
- asyncTestWaiter {
- assert(startStatus != null, "onQueryProgress called before onQueryStarted")
- synchronized { progressStatuses += queryProgress.queryStatus }
- }
- }
-
- override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
- asyncTestWaiter {
- assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
- terminationStatus = queryTerminated.queryStatus
- terminationException = queryTerminated.exception
- }
- asyncTestWaiter.dismiss()
- }
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 98f3bec708..c68f953b10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -17,24 +17,26 @@
package org.apache.spark.sql.streaming
+import java.util.UUID
+
import scala.collection.mutable
import org.scalactic.TolerantNumerics
+import org.scalatest.concurrent.AsyncAssertions.Waiter
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester._
import org.apache.spark.SparkException
import org.apache.spark.scheduler._
-import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.functions._
-import org.apache.spark.util.{JsonProtocol, ManualClock}
-
+import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.util.JsonProtocol
class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
import testImplicits._
- import StreamingQueryListenerSuite._
// To make === between double tolerate inexact values
implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
@@ -46,86 +48,86 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Make sure we don't leak any events to the next test
}
- test("single listener, check trigger statuses") {
- import StreamingQueryListenerSuite._
- clock = new StreamManualClock
-
- /** Custom MemoryStream that waits for manual clock to reach a time */
- val inputData = new MemoryStream[Int](0, sqlContext) {
- // Wait for manual clock to be 100 first time there is data
- override def getOffset: Option[Offset] = {
- val offset = super.getOffset
- if (offset.nonEmpty) {
- clock.waitTillTime(100)
+ testQuietly("single listener, check trigger events are generated correctly") {
+ val clock = new StreamManualClock
+ val inputData = new MemoryStream[Int](0, sqlContext)
+ val df = inputData.toDS().as[Long].map { 10 / _ }
+ val listener = new EventCollector
+ try {
+ // No events until started
+ spark.streams.addListener(listener)
+ assert(listener.startEvent === null)
+ assert(listener.progressEvents.isEmpty)
+ assert(listener.terminationEvent === null)
+
+ testStream(df, OutputMode.Append)(
+
+ // Start event generated when query started
+ StartStream(ProcessingTime(100), triggerClock = clock),
+ AssertOnQuery { query =>
+ assert(listener.startEvent !== null)
+ assert(listener.startEvent.id === query.id)
+ assert(listener.startEvent.name === query.name)
+ assert(listener.progressEvents.isEmpty)
+ assert(listener.terminationEvent === null)
+ true
+ },
+
+ // Progress event generated when data processed
+ AddData(inputData, 1, 2),
+ AdvanceManualClock(100),
+ CheckAnswer(10, 5),
+ AssertOnQuery { query =>
+ assert(listener.progressEvents.nonEmpty)
+ assert(listener.progressEvents.last.json === query.lastProgress.json)
+ assert(listener.terminationEvent === null)
+ true
+ },
+
+ // Termination event generated when stopped cleanly
+ StopStream,
+ AssertOnQuery { query =>
+ eventually(Timeout(streamingTimeout)) {
+ assert(listener.terminationEvent !== null)
+ assert(listener.terminationEvent.id === query.id)
+ assert(listener.terminationEvent.exception === None)
+ }
+ listener.checkAsyncErrors()
+ listener.reset()
+ true
+ },
+
+ // Termination event generated with exception message when stopped with error
+ StartStream(ProcessingTime(100), triggerClock = clock),
+ AddData(inputData, 0),
+ AdvanceManualClock(100),
+ ExpectFailure[SparkException],
+ AssertOnQuery { query =>
+ assert(listener.terminationEvent !== null)
+ assert(listener.terminationEvent.id === query.id)
+ assert(listener.terminationEvent.exception.nonEmpty)
+ listener.checkAsyncErrors()
+ true
}
- offset
- }
-
- // Wait for manual clock to be 300 first time there is data
- override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- clock.waitTillTime(300)
- super.getBatch(start, end)
- }
- }
-
- // This is to make sure thatquery waits for manual clock to be 600 first time there is data
- val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
- clock.waitTillTime(600)
- x
+ )
+ } finally {
+ spark.streams.removeListener(listener)
}
-
- testStream(mapped, OutputMode.Complete)(
- StartStream(triggerClock = clock),
- AddData(inputData, 1, 2),
- AdvanceManualClock(100), // unblock getOffset, will block on getBatch
- AdvanceManualClock(200), // unblock getBatch, will block on computation
- AdvanceManualClock(300), // unblock computation
- AssertOnQuery { _ => clock.getTimeMillis() === 600 },
- AssertOnLastQueryStatus { status: StreamingQueryStatus =>
- // Check the correctness of the trigger info of the last completed batch reported by
- // onQueryProgress
- assert(status.triggerDetails.containsKey("batchId"))
- assert(status.triggerDetails.get("isTriggerActive") === "false")
- assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
-
- assert(status.triggerDetails.get("timestamp.triggerStart") === "0")
- assert(status.triggerDetails.get("timestamp.afterGetOffset") === "100")
- assert(status.triggerDetails.get("timestamp.afterGetBatch") === "300")
- assert(status.triggerDetails.get("timestamp.triggerFinish") === "600")
-
- assert(status.triggerDetails.get("latency.getOffset.total") === "100")
- assert(status.triggerDetails.get("latency.getBatch.total") === "200")
- assert(status.triggerDetails.get("latency.optimizer") === "0")
- assert(status.triggerDetails.get("latency.offsetLogWrite") === "0")
- assert(status.triggerDetails.get("latency.fullTrigger") === "600")
-
- assert(status.triggerDetails.get("numRows.input.total") === "2")
- assert(status.triggerDetails.get("numRows.state.aggregation1.total") === "1")
- assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")
-
- assert(status.sourceStatuses.length === 1)
- assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId"))
- assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
- assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
- assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")
- },
- CheckAnswer(2)
- )
}
test("adding and removing listener") {
- def isListenerActive(listener: QueryStatusCollector): Boolean = {
+ def isListenerActive(listener: EventCollector): Boolean = {
listener.reset()
testStream(MemoryStream[Int].toDS)(
StartStream(),
StopStream
)
- listener.startStatus != null
+ listener.startEvent != null
}
try {
- val listener1 = new QueryStatusCollector
- val listener2 = new QueryStatusCollector
+ val listener1 = new EventCollector
+ val listener2 = new EventCollector
spark.streams.addListener(listener1)
assert(isListenerActive(listener1) === true)
@@ -142,14 +144,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
test("event ordering") {
- val listener = new QueryStatusCollector
+ val listener = new EventCollector
withListenerAdded(listener) {
for (i <- 1 to 100) {
listener.reset()
- require(listener.startStatus === null)
+ require(listener.startEvent === null)
testStream(MemoryStream[Int].toDS)(
StartStream(),
- Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
+ Assert(listener.startEvent !== null, "onQueryStarted not called before query returned"),
StopStream,
Assert { listener.checkAsyncErrors() }
)
@@ -158,7 +160,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
testQuietly("exception should be reported in QueryTerminated") {
- val listener = new QueryStatusCollector
+ val listener = new EventCollector
withListenerAdded(listener) {
val input = MemoryStream[Int]
testStream(input.toDS.map(_ / 0))(
@@ -167,49 +169,46 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
ExpectFailure[SparkException](),
Assert {
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
- assert(listener.terminationStatus !== null)
- assert(listener.terminationException.isDefined)
+ assert(listener.terminationEvent !== null)
+ assert(listener.terminationEvent.exception.nonEmpty)
// Make sure that the exception message reported through listener
// contains the actual exception and relevant stack trace
- assert(!listener.terminationException.get.contains("StreamingQueryException"))
- assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
- assert(listener.terminationException.get.contains("StreamingQueryListenerSuite"))
+ assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+ assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+ assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
}
)
}
}
- test("QueryStarted serialization") {
- val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
+ test("QueryStartedEvent serialization") {
+ val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
val json = JsonProtocol.sparkEventToJson(queryStarted)
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryStartedEvent]
- assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus)
}
- test("QueryProgress serialization") {
- val queryProcess = new StreamingQueryListener.QueryProgressEvent(
- StreamingQueryStatus.testStatus)
- val json = JsonProtocol.sparkEventToJson(queryProcess)
- val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
+ test("QueryProgressEvent serialization") {
+ val event = new StreamingQueryListener.QueryProgressEvent(
+ StreamingQueryProgressSuite.testProgress)
+ val json = JsonProtocol.sparkEventToJson(event)
+ val newEvent = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
- assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus)
+ assert(event.progress.json === newEvent.progress.json)
}
- test("QueryTerminated serialization") {
+ test("QueryTerminatedEvent serialization") {
val exception = new RuntimeException("exception")
val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
- StreamingQueryStatus.testStatus,
- Some(exception.getMessage))
- val json =
- JsonProtocol.sparkEventToJson(queryQueryTerminated)
+ UUID.randomUUID, Some(exception.getMessage))
+ val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
- assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus)
+ assert(queryQueryTerminated.id === newQueryTerminated.id)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}
- test("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
+ testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
// query-event-logs-version-2.0.0.txt has all types of events generated by
// Structured Streaming in Spark 2.0.0.
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
@@ -217,7 +216,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
}
- test("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
+ testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
// query-event-logs-version-2.0.1.txt has all types of events generated by
// Structured Streaming in Spark 2.0.1.
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
@@ -248,28 +247,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
- private def assertStreamingQueryInfoEquals(
- expected: StreamingQueryStatus,
- actual: StreamingQueryStatus): Unit = {
- assert(expected.name === actual.name)
- assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
- expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
- case (expectedSource, actualSource) =>
- assertSourceStatus(expectedSource, actualSource)
- }
- assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
- }
-
- private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = {
- assert(expected.description === actual.description)
- assert(expected.offsetDesc === actual.offsetDesc)
- }
-
- private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = {
- assert(expected.description === actual.description)
- assert(expected.offsetDesc === actual.offsetDesc)
- }
-
private def withListenerAdded(listener: StreamingQueryListener)(body: => Unit): Unit = {
try {
failAfter(streamingTimeout) {
@@ -287,9 +264,51 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
val listenerBus = spark.streams invokePrivate listenerBusMethod()
listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
}
-}
-object StreamingQueryListenerSuite {
- // Singleton reference to clock that does not get serialized in task closures
- @volatile var clock: ManualClock = null
+ /** Collects events from the StreamingQueryListener for testing */
+ class EventCollector extends StreamingQueryListener {
+ // to catch errors in the async listener events
+ @volatile private var asyncTestWaiter = new Waiter
+
+ @volatile var startEvent: QueryStartedEvent = null
+ @volatile var terminationEvent: QueryTerminatedEvent = null
+
+ private val _progressEvents = new mutable.Queue[StreamingQueryProgress]
+
+ def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
+ _progressEvents.filter(_.numInputRows > 0)
+ }
+
+ def reset(): Unit = {
+ startEvent = null
+ terminationEvent = null
+ _progressEvents.clear()
+ asyncTestWaiter = new Waiter
+ }
+
+ def checkAsyncErrors(): Unit = {
+ asyncTestWaiter.await(timeout(streamingTimeout))
+ }
+
+ override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+ asyncTestWaiter {
+ startEvent = queryStarted
+ }
+ }
+
+ override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
+ asyncTestWaiter {
+ assert(startEvent != null, "onQueryProgress called before onQueryStarted")
+ _progressEvents.synchronized { _progressEvents += queryProgress.progress }
+ }
+ }
+
+ override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
+ asyncTestWaiter {
+ assert(startEvent != null, "onQueryTerminated called before onQueryStarted")
+ terminationEvent = queryTerminated
+ }
+ asyncTestWaiter.dismiss()
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 41ffd56cf1..268b8ff7b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -62,7 +62,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
assert(spark.streams.get(q1.id).eq(q1))
assert(spark.streams.get(q2.id).eq(q2))
assert(spark.streams.get(q3.id).eq(q3))
- assert(spark.streams.get(-1) === null) // non-existent id
+ assert(spark.streams.get(java.util.UUID.randomUUID()) === null) // non-existent id
q1.stop()
assert(spark.streams.active.toSet === Set(q2, q3))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
new file mode 100644
index 0000000000..45d29f6b35
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._
+
+
+class StreamingQueryProgressSuite extends SparkFunSuite {
+
+ test("prettyJson") {
+ val json = testProgress.prettyJson
+ assert(json ===
+ s"""
+ |{
+ | "id" : "${testProgress.id.toString}",
+ | "name" : "name",
+ | "timestamp" : 1,
+ | "numInputRows" : 678,
+ | "inputRowsPerSecond" : 10.0,
+ | "durationMs" : {
+ | "total" : 0
+ | },
+ | "currentWatermark" : 3,
+ | "stateOperators" : [ {
+ | "numRowsTotal" : 0,
+ | "numRowsUpdated" : 1
+ | } ],
+ | "sources" : [ {
+ | "description" : "source",
+ | "startOffset" : 123,
+ | "endOffset" : 456,
+ | "numInputRows" : 678,
+ | "inputRowsPerSecond" : 10.0
+ | } ],
+ | "sink" : {
+ | "description" : "sink"
+ | }
+ |}
+ """.stripMargin.trim)
+ assert(compact(parse(json)) === testProgress.json)
+
+ }
+
+ test("json") {
+ assert(compact(parse(testProgress.json)) === testProgress.json)
+ }
+
+ test("toString") {
+ assert(testProgress.toString === testProgress.prettyJson)
+ }
+}
+
+object StreamingQueryProgressSuite {
+ val testProgress = new StreamingQueryProgress(
+ id = UUID.randomUUID(),
+ name = "name",
+ timestamp = 1L,
+ batchId = 2L,
+ durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
+ currentWatermark = 3L,
+ stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
+ sources = Array(
+ new SourceProgress(
+ description = "source",
+ startOffset = "123",
+ endOffset = "456",
+ numInputRows = 678,
+ inputRowsPerSecond = 10.0,
+ processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json
+ )
+ ),
+ sink = new SinkProgress("sink")
+ )
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
deleted file mode 100644
index 50a7d92ede..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.SparkFunSuite
-
-class StreamingQueryStatusSuite extends SparkFunSuite {
- test("toString") {
- assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
- """
- |Status of source MySource1
- | Available offset: 0
- | Input rate: 15.5 rows/sec
- | Processing rate: 23.5 rows/sec
- | Trigger details:
- | numRows.input.source: 100
- | latency.getOffset.source: 10
- | latency.getBatch.source: 20
- """.stripMargin.trim, "SourceStatus.toString does not match")
-
- assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
- """
- |Status of sink MySink
- | Committed offsets: [1, -]
- """.stripMargin.trim, "SinkStatus.toString does not match")
-
- assert(StreamingQueryStatus.testStatus.toString ===
- """
- |Status of query 'query'
- | Query id: 1
- | Status timestamp: 123
- | Input rate: 15.5 rows/sec
- | Processing rate 23.5 rows/sec
- | Latency: 345.0 ms
- | Trigger details:
- | batchId: 5
- | isDataPresentInTrigger: true
- | isTriggerActive: true
- | latency.getBatch.total: 20
- | latency.getOffset.total: 10
- | numRows.input.total: 100
- | Source statuses [1 source]:
- | Source 1 - MySource1
- | Available offset: 0
- | Input rate: 15.5 rows/sec
- | Processing rate: 23.5 rows/sec
- | Trigger details:
- | numRows.input.source: 100
- | latency.getOffset.source: 10
- | latency.getBatch.source: 20
- | Sink status - MySink
- | Committed offsets: [1, -]
- """.stripMargin.trim, "StreamingQueryStatus.toString does not match")
-
- }
-
- test("json") {
- assert(StreamingQueryStatus.testStatus.json ===
- """
- |{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5,
- |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20",
- |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5",
- |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"},
- |"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
- |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
- |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
- |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
- """.stripMargin.replace("\n", "").trim)
- }
-
- test("prettyJson") {
- assert(
- StreamingQueryStatus.testStatus.prettyJson ===
- """
- |{
- | "name" : "query",
- | "id" : 1,
- | "timestamp" : 123,
- | "inputRate" : 15.5,
- | "processingRate" : 23.5,
- | "latency" : 345.0,
- | "triggerDetails" : {
- | "latency.getBatch.total" : "20",
- | "numRows.input.total" : "100",
- | "isTriggerActive" : "true",
- | "batchId" : "5",
- | "latency.getOffset.total" : "10",
- | "isDataPresentInTrigger" : "true"
- | },
- | "sourceStatuses" : [ {
- | "description" : "MySource1",
- | "offsetDesc" : "0",
- | "inputRate" : 15.5,
- | "processingRate" : 23.5,
- | "triggerDetails" : {
- | "numRows.input.source" : "100",
- | "latency.getOffset.source" : "10",
- | "latency.getBatch.source" : "20"
- | }
- | } ],
- | "sinkStatus" : {
- | "description" : "MySink",
- | "offsetDesc" : "[1, -]"
- | }
- |}
- """.stripMargin.trim)
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 8ecb33cf9d..4f3b4a2d75 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -20,14 +20,15 @@ package org.apache.spark.sql.streaming
import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.types.StructType
import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{ManualClock, Utils}
class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
@@ -109,85 +110,139 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
)
}
- testQuietly("query statuses") {
- val inputData = MemoryStream[Int]
- val mapped = inputData.toDS().map(6 / _)
- testStream(mapped)(
- AssertOnQuery(q => q.status.name === q.name),
- AssertOnQuery(q => q.status.id === q.id),
- AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
- AssertOnQuery(_.status.inputRate === 0.0),
- AssertOnQuery(_.status.processingRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === "-"),
- AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.status.sinkStatus.offsetDesc === OffsetSeq(None :: Nil).toString),
- AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"),
- AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.sinkStatus.offsetDesc === new OffsetSeq(None :: Nil).toString),
+ testQuietly("query statuses and progresses") {
+ import StreamingQuerySuite._
+ clock = new StreamManualClock
+
+ /** Custom MemoryStream that waits for manual clock to reach a time */
+ val inputData = new MemoryStream[Int](0, sqlContext) {
+ // Wait for manual clock to be 100 first time there is data
+ override def getOffset: Option[Offset] = {
+ val offset = super.getOffset
+ if (offset.nonEmpty) {
+ clock.waitTillTime(300)
+ }
+ offset
+ }
- AddData(inputData, 1, 2),
- CheckAnswer(6, 3),
- AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
- AssertOnQuery(_.status.inputRate >= 0.0),
- AssertOnQuery(_.status.processingRate >= 0.0),
- AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).json),
- AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0),
- AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0),
- AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- OffsetSeq.fill(LongOffset(0)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).json),
- AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0),
- AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(0)).toString),
+ // Wait for manual clock to be 300 first time there is data
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ clock.waitTillTime(600)
+ super.getBatch(start, end)
+ }
+ }
- AddData(inputData, 1, 2),
- CheckAnswer(6, 3, 6, 3),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
- AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- OffsetSeq.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
- AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
+ // This is to make sure thatquery waits for manual clock to be 600 first time there is data
+ val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
+ clock.waitTillTime(1100)
+ x
+ }
- StopStream,
- AssertOnQuery(_.status.inputRate === 0.0),
- AssertOnQuery(_.status.processingRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
- AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- OffsetSeq.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
- AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
- AssertOnQuery(_.status.triggerDetails.isEmpty),
+ case class AssertStreamExecThreadToWaitForClock()
+ extends AssertOnQuery(q => {
+ eventually(Timeout(streamingTimeout)) {
+ if (q.exception.isEmpty) {
+ assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }, "")
+
+ testStream(mapped, OutputMode.Complete)(
+ StartStream(ProcessingTime(100), triggerClock = clock),
+ AssertStreamExecThreadToWaitForClock(),
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ // TODO: test status.message before trigger has started
+ // AssertOnQuery(_.lastProgress === null) // there is an empty trigger as soon as started
+ AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+ // Test status while offset is being fetched
+ AddData(inputData, 1, 2),
+ AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset
+ AssertStreamExecThreadToWaitForClock(),
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === true),
+ AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")),
+ AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+ // Test status while batch is being fetched
+ AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
+ AssertStreamExecThreadToWaitForClock(),
+ AssertOnQuery(_.status.isDataAvailable === true),
+ AssertOnQuery(_.status.isTriggerActive === true),
+ AssertOnQuery(_.status.message === "Processing new data"),
+ AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+ // Test status while batch is being processed
+ AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
+ AssertOnQuery(_.status.isDataAvailable === true),
+ AssertOnQuery(_.status.isTriggerActive === true),
+ AssertOnQuery(_.status.message === "Processing new data"),
+ AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+ // Test status while batch processing has completed
+ AdvanceManualClock(500), // time = 1100 to unblock job
+ AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
+ CheckAnswer(2),
+ AssertOnQuery(_.status.isDataAvailable === true),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message === "Waiting for next trigger"),
+ AssertOnQuery { query =>
+ assert(query.lastProgress != null)
+ assert(query.recentProgresses.exists(_.numInputRows > 0))
+ assert(query.recentProgresses.last.eq(query.lastProgress))
+
+ val progress = query.lastProgress
+ assert(progress.id === query.id)
+ assert(progress.name === query.name)
+ assert(progress.batchId === 0)
+ assert(progress.timestamp === 100)
+ assert(progress.numInputRows === 2)
+ assert(progress.processedRowsPerSecond === 2.0)
+
+ assert(progress.durationMs.get("getOffset") === 200)
+ assert(progress.durationMs.get("getBatch") === 300)
+ assert(progress.durationMs.get("queryPlanning") === 0)
+ assert(progress.durationMs.get("walCommit") === 0)
+ assert(progress.durationMs.get("triggerExecution") === 1000)
+
+ assert(progress.sources.length === 1)
+ assert(progress.sources(0).description contains "MemoryStream")
+ assert(progress.sources(0).startOffset === null)
+ assert(progress.sources(0).endOffset !== null)
+ assert(progress.sources(0).processedRowsPerSecond === 2.0)
+
+ assert(progress.stateOperators.length === 1)
+ assert(progress.stateOperators(0).numRowsUpdated === 1)
+ assert(progress.stateOperators(0).numRowsTotal === 1)
+
+ assert(progress.sink.description contains "MemorySink")
+ true
+ },
- StartStream(),
- AddData(inputData, 0),
- ExpectFailure[SparkException],
- AssertOnQuery(_.status.inputRate === 0.0),
- AssertOnQuery(_.status.processingRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).json),
- AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- OffsetSeq.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).json),
- AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString)
+ AddData(inputData, 1, 2),
+ AdvanceManualClock(100), // allow another trigger
+ CheckAnswer(4),
+ AssertOnQuery(_.status.isDataAvailable === true),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message === "Waiting for next trigger"),
+ AssertOnQuery { query =>
+ assert(query.recentProgresses.last.eq(query.lastProgress))
+ assert(query.lastProgress.batchId === 1)
+ assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
+ true
+ },
+
+ // Test status after data is not available for a trigger
+ AdvanceManualClock(100), // allow another trigger
+ AssertStreamExecThreadToWaitForClock(),
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message === "Waiting for next trigger")
)
}
@@ -196,7 +251,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
/** Whether metrics of a query is registered for reporting */
def isMetricsRegistered(query: StreamingQuery): Boolean = {
- val sourceName = s"StructuredStreaming.${query.name}"
+ val sourceName = s"spark.streaming.${query.name}"
val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName)
require(sources.size <= 1)
sources.nonEmpty
@@ -229,23 +284,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
// Trigger input has 10 rows, static input has 2 rows,
// therefore after the first trigger, the calculated input rows should be 10
- val status = getFirstTriggerStatus(streamingInputDF.join(staticInputDF, "value"))
- assert(status.triggerDetails.get("numRows.input.total") === "10")
- assert(status.sourceStatuses.size === 1)
- assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10")
+ val progress = getFirstProgress(streamingInputDF.join(staticInputDF, "value"))
+ assert(progress.numInputRows === 10)
+ assert(progress.sources.size === 1)
+ assert(progress.sources(0).numInputRows === 10)
}
- test("input row calculation with trigger DF having multiple leaves") {
+ test("input row calculation with trigger input DF having multiple leaves") {
val streamingTriggerDF =
spark.createDataset(1 to 5).toDF.union(spark.createDataset(6 to 10).toDF)
require(streamingTriggerDF.logicalPlan.collectLeaves().size > 1)
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF)
// After the first trigger, the calculated input rows should be 10
- val status = getFirstTriggerStatus(streamingInputDF)
- assert(status.triggerDetails.get("numRows.input.total") === "10")
- assert(status.sourceStatuses.size === 1)
- assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10")
+ val progress = getFirstProgress(streamingInputDF)
+ assert(progress.numInputRows === 10)
+ assert(progress.sources.size === 1)
+ assert(progress.sources(0).numInputRows === 10)
}
testQuietly("StreamExecution metadata garbage collection") {
@@ -285,34 +340,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
StreamingExecutionRelation(source)
}
- /** Returns the query status at the end of the first trigger of streaming DF */
- private def getFirstTriggerStatus(streamingDF: DataFrame): StreamingQueryStatus = {
- // A StreamingQueryListener that gets the query status after the first completed trigger
- val listener = new StreamingQueryListener {
- @volatile var firstStatus: StreamingQueryStatus = null
- @volatile var queryStartedEvent = 0
- override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
- queryStartedEvent += 1
- }
- override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
- if (firstStatus == null) firstStatus = queryProgress.queryStatus
- }
- override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
- }
-
+ /** Returns the query progress at the end of the first trigger of streaming DF */
+ private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress = {
try {
- spark.streams.addListener(listener)
val q = streamingDF.writeStream.format("memory").queryName("test").start()
q.processAllAvailable()
- eventually(timeout(streamingTimeout)) {
- assert(listener.firstStatus != null)
- // test if QueryStartedEvent callback is called for only once
- assert(listener.queryStartedEvent === 1)
- }
- listener.firstStatus
+ q.recentProgresses.head
} finally {
spark.streams.active.map(_.stop())
- spark.streams.removeListener(listener)
}
}
@@ -369,3 +404,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
}
}
}
+
+object StreamingQuerySuite {
+ // Singleton reference to clock that does not get serialized in task closures
+ var clock: ManualClock = null
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
index 3e9488c7dc..12f3c3e5ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -51,6 +51,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
test("watermark metric") {
+
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
@@ -62,16 +63,19 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
testStream(windowedAggregation)(
AddData(inputData, 15),
- AssertOnLastQueryStatus { status =>
- status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+ CheckAnswer(),
+ AssertOnQuery { query =>
+ query.lastProgress.currentWatermark === 5000
},
AddData(inputData, 15),
- AssertOnLastQueryStatus { status =>
- status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+ CheckAnswer(),
+ AssertOnQuery { query =>
+ query.lastProgress.currentWatermark === 5000
},
AddData(inputData, 25),
- AssertOnLastQueryStatus { status =>
- status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "15000"
+ CheckAnswer(),
+ AssertOnQuery { query =>
+ query.lastProgress.currentWatermark === 15000
}
)
}