aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala7
-rw-r--r--project/MimaExcludes.scala11
-rw-r--r--python/pyspark/sql/streaming.py326
-rw-r--r--python/pyspark/sql/tests.py22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala53
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala234
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala282
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala243
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala66
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala95
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala151
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala193
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala213
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala267
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala98
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala123
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala260
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala16
26 files changed, 1087 insertions, 1752 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index e1af14f95d..2d6ccb22dd 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest {
val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
+ StartStream(trigger = ProcessingTime(1)),
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
- AssertOnLastQueryStatus { status =>
- assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
- assert(status.sourceStatuses(0).processingRate > 0.0)
+ AssertOnQuery { query =>
+ val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+ recordsRead == 3
}
)
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 84014014f2..4995af034f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -78,6 +78,17 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
+ // [SPARK-18516][SQL] Split state and progress in streaming
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SourceStatus"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SinkStatus"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sinkStatus"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),
+
// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"),
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 9c3a237699..c420b0d016 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -16,6 +16,8 @@
#
import sys
+import json
+
if sys.version >= '3':
intlike = int
basestring = unicode = str
@@ -48,10 +50,9 @@ class StreamingQuery(object):
@property
@since(2.0)
def id(self):
- """The id of the streaming query. This id is unique across all queries that have been
- started in the current process.
+ """The id of the streaming query.
"""
- return self._jsq.id()
+ return self._jsq.id().toString()
@property
@since(2.0)
@@ -87,6 +88,24 @@ class StreamingQuery(object):
else:
return self._jsq.awaitTermination()
+ @property
+ @since(2.1)
+ def recentProgresses(self):
+ """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
+ The number of progress updates retained for each stream is configured by Spark session
+ configuration `spark.sql.streaming.numRecentProgresses`.
+ """
+ return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
+
+ @property
+ @since(2.1)
+ def lastProgress(self):
+ """
+ Returns the most recent :class:`StreamingQueryProgress` update of this streaming query.
+ :return: a map
+ """
+ return json.loads(self._jsq.lastProgress().json())
+
@since(2.0)
def processAllAvailable(self):
"""Blocks until all available data in the source has been processed and committed to the
@@ -149,8 +168,6 @@ class StreamingQueryManager(object):
True
>>> sq.stop()
"""
- if not isinstance(id, intlike):
- raise ValueError("The id for the query must be an integer. Got: %s" % id)
return StreamingQuery(self._jsqm.get(id))
@since(2.0)
@@ -191,303 +208,6 @@ class StreamingQueryManager(object):
self._jsqm.resetTerminated()
-class StreamingQueryStatus(object):
- """A class used to report information about the progress of a StreamingQuery.
-
- .. note:: Experimental
-
- .. versionadded:: 2.1
- """
-
- def __init__(self, jsqs):
- self._jsqs = jsqs
-
- def __str__(self):
- """
- Pretty string of this query status.
-
- >>> print(sqs)
- Status of query 'query'
- Query id: 1
- Status timestamp: 123
- Input rate: 15.5 rows/sec
- Processing rate 23.5 rows/sec
- Latency: 345.0 ms
- Trigger details:
- batchId: 5
- isDataPresentInTrigger: true
- isTriggerActive: true
- latency.getBatch.total: 20
- latency.getOffset.total: 10
- numRows.input.total: 100
- Source statuses [1 source]:
- Source 1 - MySource1
- Available offset: 0
- Input rate: 15.5 rows/sec
- Processing rate: 23.5 rows/sec
- Trigger details:
- numRows.input.source: 100
- latency.getOffset.source: 10
- latency.getBatch.source: 20
- Sink status - MySink
- Committed offsets: [1, -]
- """
- return self._jsqs.toString()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def name(self):
- """
- Name of the query. This name is unique across all active queries.
-
- >>> sqs.name
- u'query'
- """
- return self._jsqs.name()
-
- @property
- @since(2.1)
- def id(self):
- """
- Id of the query. This id is unique across all queries that have been started in
- the current process.
-
- >>> int(sqs.id)
- 1
- """
- return self._jsqs.id()
-
- @property
- @since(2.1)
- def timestamp(self):
- """
- Timestamp (ms) of when this query was generated.
-
- >>> int(sqs.timestamp)
- 123
- """
- return self._jsqs.timestamp()
-
- @property
- @since(2.1)
- def inputRate(self):
- """
- Current total rate (rows/sec) at which data is being generated by all the sources.
-
- >>> sqs.inputRate
- 15.5
- """
- return self._jsqs.inputRate()
-
- @property
- @since(2.1)
- def processingRate(self):
- """
- Current rate (rows/sec) at which the query is processing data from all the sources.
-
- >>> sqs.processingRate
- 23.5
- """
- return self._jsqs.processingRate()
-
- @property
- @since(2.1)
- def latency(self):
- """
- Current average latency between the data being available in source and the sink
- writing the corresponding output.
-
- >>> sqs.latency
- 345.0
- """
- if (self._jsqs.latency().nonEmpty()):
- return self._jsqs.latency().get()
- else:
- return None
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def sourceStatuses(self):
- """
- Current statuses of the sources as a list.
-
- >>> len(sqs.sourceStatuses)
- 1
- >>> sqs.sourceStatuses[0].description
- u'MySource1'
- """
- return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def sinkStatus(self):
- """
- Current status of the sink.
-
- >>> sqs.sinkStatus.description
- u'MySink'
- """
- return SinkStatus(self._jsqs.sinkStatus())
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def triggerDetails(self):
- """
- Low-level details of the currently active trigger (e.g. number of rows processed
- in trigger, latency of intermediate steps, etc.).
-
- If no trigger is currently active, then it will have details of the last completed trigger.
-
- >>> sqs.triggerDetails
- {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
- u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
- u'isDataPresentInTrigger': u'true'}
- """
- return self._jsqs.triggerDetails()
-
-
-class SourceStatus(object):
- """
- Status and metrics of a streaming Source.
-
- .. note:: Experimental
-
- .. versionadded:: 2.1
- """
-
- def __init__(self, jss):
- self._jss = jss
-
- def __str__(self):
- """
- Pretty string of this source status.
-
- >>> print(sqs.sourceStatuses[0])
- Status of source MySource1
- Available offset: 0
- Input rate: 15.5 rows/sec
- Processing rate: 23.5 rows/sec
- Trigger details:
- numRows.input.source: 100
- latency.getOffset.source: 10
- latency.getBatch.source: 20
- """
- return self._jss.toString()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def description(self):
- """
- Description of the source corresponding to this status.
-
- >>> sqs.sourceStatuses[0].description
- u'MySource1'
- """
- return self._jss.description()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def offsetDesc(self):
- """
- Description of the current offset if known.
-
- >>> sqs.sourceStatuses[0].offsetDesc
- u'0'
- """
- return self._jss.offsetDesc()
-
- @property
- @since(2.1)
- def inputRate(self):
- """
- Current rate (rows/sec) at which data is being generated by the source.
-
- >>> sqs.sourceStatuses[0].inputRate
- 15.5
- """
- return self._jss.inputRate()
-
- @property
- @since(2.1)
- def processingRate(self):
- """
- Current rate (rows/sec) at which the query is processing data from the source.
-
- >>> sqs.sourceStatuses[0].processingRate
- 23.5
- """
- return self._jss.processingRate()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def triggerDetails(self):
- """
- Low-level details of the currently active trigger (e.g. number of rows processed
- in trigger, latency of intermediate steps, etc.).
-
- If no trigger is currently active, then it will have details of the last completed trigger.
-
- >>> sqs.sourceStatuses[0].triggerDetails
- {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
- u'latency.getBatch.source': u'20'}
- """
- return self._jss.triggerDetails()
-
-
-class SinkStatus(object):
- """
- Status and metrics of a streaming Sink.
-
- .. note:: Experimental
-
- .. versionadded:: 2.1
- """
-
- def __init__(self, jss):
- self._jss = jss
-
- def __str__(self):
- """
- Pretty string of this source status.
-
- >>> print(sqs.sinkStatus)
- Status of sink MySink
- Committed offsets: [1, -]
- """
- return self._jss.toString()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def description(self):
- """
- Description of the source corresponding to this status.
-
- >>> sqs.sinkStatus.description
- u'MySink'
- """
- return self._jss.description()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def offsetDesc(self):
- """
- Description of the current offsets up to which data has been written by the sink.
-
- >>> sqs.sinkStatus.offsetDesc
- u'[1, -]'
- """
- return self._jss.offsetDesc()
-
-
class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.
@@ -1053,8 +773,6 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
- globs['sqs'] = StreamingQueryStatus(
- spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())
(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3d46b852c5..7151f95216 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1082,6 +1082,28 @@ class SQLTests(ReusedPySparkTestCase):
q.stop()
shutil.rmtree(tmpPath)
+ def test_stream_status_and_progress(self):
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
+ tmpPath = tempfile.mkdtemp()
+ shutil.rmtree(tmpPath)
+ self.assertTrue(df.isStreaming)
+ out = os.path.join(tmpPath, 'out')
+ chk = os.path.join(tmpPath, 'chk')
+ q = df.writeStream \
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
+ try:
+ q.processAllAvailable()
+ lastProgress = q.lastProgress
+ recentProgresses = q.recentProgresses
+ self.assertEqual(lastProgress['name'], q.name)
+ self.assertEqual(lastProgress['id'], q.id)
+ self.assertTrue(any(p == lastProgress for p in recentProgresses))
+ finally:
+ q.stop()
+ shutil.rmtree(tmpPath)
+
def test_stream_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for q in self.spark._wrapped.streams.active:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
new file mode 100644
index 0000000000..5551d12fa8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.{util => ju}
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+/**
+ * Serves metrics from a [[org.apache.spark.sql.streaming.StreamingQuery]] to
+ * Codahale/DropWizard metrics
+ */
+class MetricsReporter(
+ stream: StreamExecution,
+ override val sourceName: String) extends CodahaleSource with Logging {
+
+ override val metricRegistry: MetricRegistry = new MetricRegistry
+
+ // Metric names should not have . in them, so that all the metrics of a query are identified
+ // together in Ganglia as a single metric group
+ registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond)
+ registerGauge("processingRate-total", () => stream.lastProgress.inputRowsPerSecond)
+ registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue())
+
+ private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = {
+ synchronized {
+ metricRegistry.register(name, new Gauge[T] {
+ override def getValue: T = f()
+ })
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
new file mode 100644
index 0000000000..b7b6e1988e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.streaming._
+import org.apache.spark.util.Clock
+
+/**
+ * Responsible for continually reporting statistics about the amount of data processed as well
+ * as latency for a streaming query. This trait is designed to be mixed into the
+ * [[StreamExecution]], who is responsible for calling `startTrigger` and `finishTrigger`
+ * at the appropriate times. Additionally, the status can updated with `updateStatusMessage` to
+ * allow reporting on the streams current state (i.e. "Fetching more data").
+ */
+trait ProgressReporter extends Logging {
+
+ case class ExecutionStats(
+ inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress])
+
+ // Internal state of the stream, required for computing metrics.
+ protected def id: UUID
+ protected def name: String
+ protected def triggerClock: Clock
+ protected def logicalPlan: LogicalPlan
+ protected def lastExecution: QueryExecution
+ protected def newData: Map[Source, DataFrame]
+ protected def availableOffsets: StreamProgress
+ protected def committedOffsets: StreamProgress
+ protected def sources: Seq[Source]
+ protected def sink: Sink
+ protected def streamExecutionMetadata: StreamExecutionMetadata
+ protected def currentBatchId: Long
+ protected def sparkSession: SparkSession
+
+ // Local timestamps and counters.
+ private var currentTriggerStartTimestamp = -1L
+ private var currentTriggerEndTimestamp = -1L
+ // TODO: Restore this from the checkpoint when possible.
+ private var lastTriggerStartTimestamp = -1L
+ private val currentDurationsMs = new mutable.HashMap[String, Long]()
+
+ /** Flag that signals whether any error with input metrics have already been logged */
+ private var metricWarningLogged: Boolean = false
+
+ /** Holds the most recent query progress updates. Accesses must lock on the queue itself. */
+ private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+ @volatile
+ protected var currentStatus: StreamingQueryStatus =
+ StreamingQueryStatus(
+ message = "Initializing StreamExecution",
+ isDataAvailable = false,
+ isTriggerActive = false)
+
+ /** Returns the current status of the query. */
+ def status: StreamingQueryStatus = currentStatus
+
+ /** Returns an array containing the most recent query progress updates. */
+ def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized {
+ progressBuffer.toArray
+ }
+
+ /** Returns the most recent query progress update. */
+ def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+ progressBuffer.last
+ }
+
+ /** Begins recording statistics about query progress for a given trigger. */
+ protected def startTrigger(): Unit = {
+ logDebug("Starting Trigger Calculation")
+ lastTriggerStartTimestamp = currentTriggerStartTimestamp
+ currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+ currentStatus = currentStatus.copy(isTriggerActive = true)
+ currentDurationsMs.clear()
+ }
+
+ /** Finalizes the query progress and adds it to list of recent status updates. */
+ protected def finishTrigger(hasNewData: Boolean): Unit = {
+ currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+
+ val executionStats: ExecutionStats = if (!hasNewData) {
+ ExecutionStats(Map.empty, Seq.empty)
+ } else {
+ extractExecutionStats
+ }
+
+ val processingTimeSec =
+ (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000
+
+ val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
+ (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 1000
+ } else {
+ Double.NaN
+ }
+ logDebug(s"Execution stats: $executionStats")
+
+ val sourceProgress = sources.map { source =>
+ val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+ new SourceProgress(
+ description = source.toString,
+ startOffset = committedOffsets.get(source).map(_.json).orNull,
+ endOffset = availableOffsets.get(source).map(_.json).orNull,
+ numInputRows = numRecords,
+ inputRowsPerSecond = numRecords / inputTimeSec,
+ processedRowsPerSecond = numRecords / processingTimeSec
+ )
+ }
+ val sinkProgress = new SinkProgress(sink.toString)
+
+ val newProgress = new StreamingQueryProgress(
+ id = id,
+ name = name,
+ timestamp = currentTriggerStartTimestamp,
+ batchId = currentBatchId,
+ durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
+ currentWatermark = streamExecutionMetadata.batchWatermarkMs,
+ stateOperators = executionStats.stateOperators.toArray,
+ sources = sourceProgress.toArray,
+ sink = sinkProgress)
+
+ progressBuffer.synchronized {
+ progressBuffer += newProgress
+ while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
+ progressBuffer.dequeue()
+ }
+ }
+
+ logInfo(s"Streaming query made progress: $newProgress")
+ currentStatus = currentStatus.copy(isTriggerActive = false)
+ }
+
+ /** Extracts statistics from the most recent query execution. */
+ private def extractExecutionStats: ExecutionStats = {
+ // We want to associate execution plan leaves to sources that generate them, so that we match
+ // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
+ // Consider the translation from the streaming logical plan to the final executed plan.
+ //
+ // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
+ //
+ // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
+ // - Each logical plan leaf will be associated with a single streaming source.
+ // - There can be multiple logical plan leaves associated with a streaming source.
+ // - There can be leaves not associated with any streaming source, because they were
+ // generated from a batch source (e.g. stream-batch joins)
+ //
+ // 2. Assuming that the executed plan has same number of leaves in the same order as that of
+ // the trigger logical plan, we associate executed plan leaves with corresponding
+ // streaming sources.
+ //
+ // 3. For each source, we sum the metrics of the associated execution plan leaves.
+ //
+ val logicalPlanLeafToSource = newData.flatMap { case (source, df) =>
+ df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
+ }
+ val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
+ val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
+ val numInputRows: Map[Source, Long] =
+ if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
+ val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
+ case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
+ }
+ val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) =>
+ val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+ source -> numRows
+ }
+ sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
+ } else {
+ if (!metricWarningLogged) {
+ def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}"
+ logWarning(
+ "Could not report metrics as number leaves in trigger logical plan did not match that" +
+ s" of the execution plan:\n" +
+ s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
+ s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
+ metricWarningLogged = true
+ }
+ Map.empty
+ }
+
+ // Extract statistics about stateful operators in the query plan.
+ val stateNodes = lastExecution.executedPlan.collect {
+ case p if p.isInstanceOf[StateStoreSaveExec] => p
+ }
+ val stateOperators = stateNodes.map { node =>
+ new StateOperatorProgress(
+ numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
+ numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
+ }
+
+ ExecutionStats(numInputRows, stateOperators)
+ }
+
+ /** Records the duration of running `body` for the next query progress update. */
+ protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
+ val startTime = triggerClock.getTimeMillis()
+ val result = body
+ val endTime = triggerClock.getTimeMillis()
+ val timeTaken = math.max(endTime - startTime, 0)
+
+ val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L)
+ currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken)
+ logDebug(s"$triggerDetailKey took $timeTaken ms")
+ result
+ }
+
+ /** Updates the message returned in `status`. */
+ protected def updateStatusMessage(message: String): Unit = {
+ currentStatus = currentStatus.copy(message = message)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 21664d7fd0..e4f31af35f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.execution.streaming
+import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import scala.collection.mutable.ArrayBuffer
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -47,7 +47,6 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
*/
class StreamExecution(
override val sparkSession: SparkSession,
- override val id: Long,
override val name: String,
checkpointRoot: String,
val logicalPlan: LogicalPlan,
@@ -55,10 +54,12 @@ class StreamExecution(
val trigger: Trigger,
val triggerClock: Clock,
val outputMode: OutputMode)
- extends StreamingQuery with Logging {
+ extends StreamingQuery with ProgressReporter with Logging {
import org.apache.spark.sql.streaming.StreamingQueryListener._
- import StreamMetrics._
+
+ // TODO: restore this from the checkpoint directory.
+ override val id: UUID = UUID.randomUUID()
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
@@ -89,16 +90,16 @@ class StreamExecution(
* once, since the field's value may change at any time.
*/
@volatile
- private var availableOffsets = new StreamProgress
+ protected var availableOffsets = new StreamProgress
/** The current batchId or -1 if execution has not yet been initialized. */
- private var currentBatchId: Long = -1
+ protected var currentBatchId: Long = -1
/** Stream execution metadata */
- private var streamExecutionMetadata = StreamExecutionMetadata()
+ protected var streamExecutionMetadata = StreamExecutionMetadata()
/** All stream sources present in the query plan. */
- private val sources =
+ protected val sources =
logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
/** A list of unique sources in the query plan. */
@@ -113,7 +114,10 @@ class StreamExecution(
private var state: State = INITIALIZED
@volatile
- var lastExecution: QueryExecution = null
+ var lastExecution: QueryExecution = _
+
+ /** Holds the most recent input data for each source. */
+ protected var newData: Map[Source, DataFrame] = _
@volatile
private var streamDeathCause: StreamingQueryException = null
@@ -121,16 +125,8 @@ class StreamExecution(
/* Get the call site in the caller thread; will pass this into the micro batch thread */
private val callSite = Utils.getCallSite()
- /** Metrics for this query */
- private val streamMetrics =
- new StreamMetrics(uniqueSources.toSet, triggerClock, s"StructuredStreaming.$name")
-
- @volatile
- private var currentStatus: StreamingQueryStatus = null
-
- /** Flag that signals whether any error with input metrics have already been logged */
- @volatile
- private var metricWarningLogged: Boolean = false
+ /** Used to report metrics to coda-hale. */
+ lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name")
/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
@@ -158,15 +154,6 @@ class StreamExecution(
/** Whether the query is currently active or not */
override def isActive: Boolean = state == ACTIVE
- /** Returns the current status of the query. */
- override def status: StreamingQueryStatus = currentStatus
-
- /** Returns current status of all the sources. */
- override def sourceStatuses: Array[SourceStatus] = currentStatus.sourceStatuses.toArray
-
- /** Returns current status of the sink. */
- override def sinkStatus: SinkStatus = currentStatus.sinkStatus
-
/** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */
override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
@@ -200,8 +187,8 @@ class StreamExecution(
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
- updateStatus()
- postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception.
+
+ postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception.
// Unblock starting thread
startLatch.countDown()
@@ -210,40 +197,45 @@ class StreamExecution(
SparkSession.setActiveSession(sparkSession)
triggerExecutor.execute(() => {
- streamMetrics.reportTriggerStarted(currentBatchId)
- streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Finding new data from sources")
- updateStatus()
- val isTerminated = reportTimeTaken(TRIGGER_LATENCY) {
+ startTrigger()
+
+ val isTerminated =
if (isActive) {
- if (currentBatchId < 0) {
- // We'll do this initialization only once
- populateStartOffsets()
- logDebug(s"Stream running from $committedOffsets to $availableOffsets")
- } else {
- constructNextBatch()
+ reportTimeTaken("triggerExecution") {
+ if (currentBatchId < 0) {
+ // We'll do this initialization only once
+ populateStartOffsets()
+ logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+ } else {
+ constructNextBatch()
+ }
+ if (dataAvailable) {
+ currentStatus = currentStatus.copy(isDataAvailable = true)
+ updateStatusMessage("Processing new data")
+ runBatch()
+ }
}
+
+ // Report trigger as finished and construct progress object.
+ finishTrigger(dataAvailable)
+ postEvent(new QueryProgressEvent(lastProgress))
+
if (dataAvailable) {
- streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, true)
- streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Processing new data")
- updateStatus()
- runBatch()
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
- streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, false)
- streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "No new data")
- updateStatus()
+ currentStatus = currentStatus.copy(isDataAvailable = false)
+ updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
true
} else {
false
}
- }
- // Update metrics and notify others
- streamMetrics.reportTriggerFinished()
- updateStatus()
- postEvent(new QueryProgressEvent(currentStatus))
+
+ // Update committed offsets.
+ committedOffsets ++= availableOffsets
+ updateStatusMessage("Waiting for next trigger")
isTerminated
})
} catch {
@@ -264,14 +256,12 @@ class StreamExecution(
state = TERMINATED
// Update metrics and status
- streamMetrics.stop()
sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
- updateStatus()
// Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
- new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
+ new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
@@ -328,14 +318,13 @@ class StreamExecution(
val hasNewData = {
awaitBatchLock.lock()
try {
- reportTimeTaken(GET_OFFSET_LATENCY) {
- val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s =>
- reportTimeTaken(s, SOURCE_GET_OFFSET_LATENCY) {
- (s, s.getOffset)
- }
- }.toMap
- availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
- }
+ val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s =>
+ updateStatusMessage(s"Getting offsets from $s")
+ reportTimeTaken("getOffset") {
+ (s, s.getOffset)
+ }
+ }.toMap
+ availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
if (dataAvailable) {
true
@@ -350,8 +339,10 @@ class StreamExecution(
if (hasNewData) {
// Current batch timestamp in milliseconds
streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis()
- reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
- assert(offsetLog.add(currentBatchId,
+ updateStatusMessage("Writing offsets to log")
+ reportTimeTaken("walCommit") {
+ assert(offsetLog.add(
+ currentBatchId,
availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
@@ -384,30 +375,24 @@ class StreamExecution(
awaitBatchLock.unlock()
}
}
- reportTimestamp(GET_OFFSET_TIMESTAMP)
}
/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
*/
private def runBatch(): Unit = {
- // TODO: Move this to IncrementalExecution.
-
// Request unprocessed data from all sources.
- val newData = reportTimeTaken(GET_BATCH_LATENCY) {
+ newData = reportTimeTaken("getBatch") {
availableOffsets.flatMap {
case (source, available)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
- val batch = reportTimeTaken(source, SOURCE_GET_BATCH_LATENCY) {
- source.getBatch(current, available)
- }
+ val batch = source.getBatch(current, available)
logDebug(s"Retrieving data from $source: $current -> $available")
Some(source -> batch)
case _ => None
}
}
- reportTimestamp(GET_BATCH_TIMESTAMP)
// A list of attributes that will need to be updated.
var replacements = new ArrayBuffer[(Attribute, Attribute)]
@@ -438,7 +423,7 @@ class StreamExecution(
cd.dataType)
}
- val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) {
+ val executedPlan = reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSession,
triggerLogicalPlan,
@@ -451,11 +436,12 @@ class StreamExecution(
val nextBatch =
new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
- sink.addBatch(currentBatchId, nextBatch)
- reportNumRows(executedPlan, triggerLogicalPlan, newData)
+
+ reportTimeTaken("addBatch") {
+ sink.addBatch(currentBatchId, nextBatch)
+ }
// Update the eventTime watermark if we find one in the plan.
- // TODO: Does this need to be an AttributeMap?
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec =>
logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
@@ -468,10 +454,6 @@ class StreamExecution(
logTrace(s"Event time didn't move: $newWatermark < " +
s"$streamExecutionMetadata.currentEventTimeWatermark")
}
-
- if (newWatermark != 0) {
- streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark)
- }
}
awaitBatchLock.lock()
@@ -481,9 +463,6 @@ class StreamExecution(
} finally {
awaitBatchLock.unlock()
}
-
- // Update committed offsets.
- committedOffsets ++= availableOffsets
}
private def postEvent(event: StreamingQueryListener.Event) {
@@ -616,145 +595,12 @@ class StreamExecution(
""".stripMargin
}
- /**
- * Report row metrics of the executed trigger
- * @param triggerExecutionPlan Execution plan of the trigger
- * @param triggerLogicalPlan Logical plan of the trigger, generated from the query logical plan
- * @param sourceToDF Source to DataFrame returned by the source.getBatch in this trigger
- */
- private def reportNumRows(
- triggerExecutionPlan: SparkPlan,
- triggerLogicalPlan: LogicalPlan,
- sourceToDF: Map[Source, DataFrame]): Unit = {
- // We want to associate execution plan leaves to sources that generate them, so that we match
- // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
- // Consider the translation from the streaming logical plan to the final executed plan.
- //
- // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
- //
- // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
- // - Each logical plan leaf will be associated with a single streaming source.
- // - There can be multiple logical plan leaves associated with a streaming source.
- // - There can be leaves not associated with any streaming source, because they were
- // generated from a batch source (e.g. stream-batch joins)
- //
- // 2. Assuming that the executed plan has same number of leaves in the same order as that of
- // the trigger logical plan, we associate executed plan leaves with corresponding
- // streaming sources.
- //
- // 3. For each source, we sum the metrics of the associated execution plan leaves.
- //
- val logicalPlanLeafToSource = sourceToDF.flatMap { case (source, df) =>
- df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
- }
- val allLogicalPlanLeaves = triggerLogicalPlan.collectLeaves() // includes non-streaming sources
- val allExecPlanLeaves = triggerExecutionPlan.collectLeaves()
- val sourceToNumInputRows: Map[Source, Long] =
- if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
- val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
- case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
- }
- val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) =>
- val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
- source -> numRows
- }
- sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
- } else {
- if (!metricWarningLogged) {
- def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}"
- logWarning(
- "Could not report metrics as number leaves in trigger logical plan did not match that" +
- s" of the execution plan:\n" +
- s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
- s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
- metricWarningLogged = true
- }
- Map.empty
- }
- val numOutputRows = triggerExecutionPlan.metrics.get("numOutputRows").map(_.value)
- val stateNodes = triggerExecutionPlan.collect {
- case p if p.isInstanceOf[StateStoreSaveExec] => p
- }
-
- streamMetrics.reportNumInputRows(sourceToNumInputRows)
- stateNodes.zipWithIndex.foreach { case (s, i) =>
- streamMetrics.reportTriggerDetail(
- NUM_TOTAL_STATE_ROWS(i + 1),
- s.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L))
- streamMetrics.reportTriggerDetail(
- NUM_UPDATED_STATE_ROWS(i + 1),
- s.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
- }
- updateStatus()
- }
-
- private def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
- val startTime = triggerClock.getTimeMillis()
- val result = body
- val endTime = triggerClock.getTimeMillis()
- val timeTaken = math.max(endTime - startTime, 0)
- streamMetrics.reportTriggerDetail(triggerDetailKey, timeTaken)
- updateStatus()
- if (triggerDetailKey == TRIGGER_LATENCY) {
- logInfo(s"Completed up to $availableOffsets in $timeTaken ms")
- }
- result
- }
-
- private def reportTimeTaken[T](source: Source, triggerDetailKey: String)(body: => T): T = {
- val startTime = triggerClock.getTimeMillis()
- val result = body
- val endTime = triggerClock.getTimeMillis()
- streamMetrics.reportSourceTriggerDetail(
- source, triggerDetailKey, math.max(endTime - startTime, 0))
- updateStatus()
- result
- }
-
- private def reportTimestamp(triggerDetailKey: String): Unit = {
- streamMetrics.reportTriggerDetail(triggerDetailKey, triggerClock.getTimeMillis)
- updateStatus()
- }
-
- private def updateStatus(): Unit = {
- val localAvailableOffsets = availableOffsets
- val sourceStatuses = sources.map { s =>
- SourceStatus(
- s.toString,
- localAvailableOffsets.get(s).map(_.json).getOrElse("-"),
- streamMetrics.currentSourceInputRate(s),
- streamMetrics.currentSourceProcessingRate(s),
- streamMetrics.currentSourceTriggerDetails(s))
- }.toArray
- val sinkStatus = SinkStatus(
- sink.toString,
- committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
-
- currentStatus =
- StreamingQueryStatus(
- name = name,
- id = id,
- timestamp = triggerClock.getTimeMillis(),
- inputRate = streamMetrics.currentInputRate(),
- processingRate = streamMetrics.currentProcessingRate(),
- latency = streamMetrics.currentLatency(),
- sourceStatuses = sourceStatuses,
- sinkStatus = sinkStatus,
- triggerDetails = streamMetrics.currentTriggerDetails())
- }
-
trait State
case object INITIALIZED extends State
case object ACTIVE extends State
case object TERMINATED extends State
}
-object StreamExecution {
- private val _nextId = new AtomicLong(0)
-
- def nextId: Long = _nextId.getAndIncrement()
-}
-
/**
* Contains metadata associated with a stream execution. This information is
* persisted to the offset log via the OffsetSeq metadata field. Current
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
deleted file mode 100644
index 942e6ed894..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.{util => ju}
-
-import scala.collection.mutable
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.{Source => CodahaleSource}
-import org.apache.spark.util.Clock
-
-/**
- * Class that manages all the metrics related to a StreamingQuery. It does the following.
- * - Calculates metrics (rates, latencies, etc.) based on information reported by StreamExecution.
- * - Allows the current metric values to be queried
- * - Serves some of the metrics through Codahale/DropWizard metrics
- *
- * @param sources Unique set of sources in a query
- * @param triggerClock Clock used for triggering in StreamExecution
- * @param codahaleSourceName Root name for all the Codahale metrics
- */
-class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceName: String)
- extends CodahaleSource with Logging {
-
- import StreamMetrics._
-
- // Trigger infos
- private val triggerDetails = new mutable.HashMap[String, String]
- private val sourceTriggerDetails = new mutable.HashMap[Source, mutable.HashMap[String, String]]
-
- // Rate estimators for sources and sinks
- private val inputRates = new mutable.HashMap[Source, RateCalculator]
- private val processingRates = new mutable.HashMap[Source, RateCalculator]
-
- // Number of input rows in the current trigger
- private val numInputRows = new mutable.HashMap[Source, Long]
- private var currentTriggerStartTimestamp: Long = -1
- private var previousTriggerStartTimestamp: Long = -1
- private var latency: Option[Double] = None
-
- override val sourceName: String = codahaleSourceName
- override val metricRegistry: MetricRegistry = new MetricRegistry
-
- // =========== Initialization ===========
-
- // Metric names should not have . in them, so that all the metrics of a query are identified
- // together in Ganglia as a single metric group
- registerGauge("inputRate-total", currentInputRate)
- registerGauge("processingRate-total", () => currentProcessingRate)
- registerGauge("latency", () => currentLatency().getOrElse(-1.0))
-
- sources.foreach { s =>
- inputRates.put(s, new RateCalculator)
- processingRates.put(s, new RateCalculator)
- sourceTriggerDetails.put(s, new mutable.HashMap[String, String])
-
- registerGauge(s"inputRate-${s.toString}", () => currentSourceInputRate(s))
- registerGauge(s"processingRate-${s.toString}", () => currentSourceProcessingRate(s))
- }
-
- // =========== Setter methods ===========
-
- def reportTriggerStarted(batchId: Long): Unit = synchronized {
- numInputRows.clear()
- triggerDetails.clear()
- sourceTriggerDetails.values.foreach(_.clear())
-
- reportTriggerDetail(BATCH_ID, batchId)
- sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId))
- reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
- currentTriggerStartTimestamp = triggerClock.getTimeMillis()
- reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
- }
-
- def reportTriggerDetail[T](key: String, value: T): Unit = synchronized {
- triggerDetails.put(key, value.toString)
- }
-
- def reportSourceTriggerDetail[T](source: Source, key: String, value: T): Unit = synchronized {
- sourceTriggerDetails(source).put(key, value.toString)
- }
-
- def reportNumInputRows(inputRows: Map[Source, Long]): Unit = synchronized {
- numInputRows ++= inputRows
- }
-
- def reportTriggerFinished(): Unit = synchronized {
- require(currentTriggerStartTimestamp >= 0)
- val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
- reportTriggerDetail(FINISH_TIMESTAMP, currentTriggerFinishTimestamp)
- triggerDetails.remove(STATUS_MESSAGE)
- reportTriggerDetail(IS_TRIGGER_ACTIVE, false)
-
- // Report number of rows
- val totalNumInputRows = numInputRows.values.sum
- reportTriggerDetail(NUM_INPUT_ROWS, totalNumInputRows)
- numInputRows.foreach { case (s, r) =>
- reportSourceTriggerDetail(s, NUM_SOURCE_INPUT_ROWS, r)
- }
-
- val currentTriggerDuration = currentTriggerFinishTimestamp - currentTriggerStartTimestamp
- val previousInputIntervalOption = if (previousTriggerStartTimestamp >= 0) {
- Some(currentTriggerStartTimestamp - previousTriggerStartTimestamp)
- } else None
-
- // Update input rate = num rows received by each source during the previous trigger interval
- // Interval is measures as interval between start times of previous and current trigger.
- //
- // TODO: Instead of trigger start, we should use time when getOffset was called on each source
- // as this may be different for each source if there are many sources in the query plan
- // and getOffset is called serially on them.
- if (previousInputIntervalOption.nonEmpty) {
- sources.foreach { s =>
- inputRates(s).update(numInputRows.getOrElse(s, 0), previousInputIntervalOption.get)
- }
- }
-
- // Update processing rate = num rows processed for each source in current trigger duration
- sources.foreach { s =>
- processingRates(s).update(numInputRows.getOrElse(s, 0), currentTriggerDuration)
- }
-
- // Update latency = if data present, 0.5 * previous trigger interval + current trigger duration
- if (previousInputIntervalOption.nonEmpty && totalNumInputRows > 0) {
- latency = Some((previousInputIntervalOption.get.toDouble / 2) + currentTriggerDuration)
- } else {
- latency = None
- }
-
- previousTriggerStartTimestamp = currentTriggerStartTimestamp
- currentTriggerStartTimestamp = -1
- }
-
- // =========== Getter methods ===========
-
- def currentInputRate(): Double = synchronized {
- // Since we are calculating source input rates using the same time interval for all sources
- // it is fine to calculate total input rate as the sum of per source input rate.
- inputRates.map(_._2.currentRate).sum
- }
-
- def currentSourceInputRate(source: Source): Double = synchronized {
- inputRates(source).currentRate
- }
-
- def currentProcessingRate(): Double = synchronized {
- // Since we are calculating source processing rates using the same time interval for all sources
- // it is fine to calculate total processing rate as the sum of per source processing rate.
- processingRates.map(_._2.currentRate).sum
- }
-
- def currentSourceProcessingRate(source: Source): Double = synchronized {
- processingRates(source).currentRate
- }
-
- def currentLatency(): Option[Double] = synchronized { latency }
-
- def currentTriggerDetails(): Map[String, String] = synchronized { triggerDetails.toMap }
-
- def currentSourceTriggerDetails(source: Source): Map[String, String] = synchronized {
- sourceTriggerDetails(source).toMap
- }
-
- // =========== Other methods ===========
-
- private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = {
- synchronized {
- metricRegistry.register(name, new Gauge[T] {
- override def getValue: T = f()
- })
- }
- }
-
- def stop(): Unit = synchronized {
- triggerDetails.clear()
- inputRates.valuesIterator.foreach { _.stop() }
- processingRates.valuesIterator.foreach { _.stop() }
- latency = None
- }
-}
-
-object StreamMetrics extends Logging {
- /** Simple utility class to calculate rate while avoiding DivideByZero */
- class RateCalculator {
- @volatile private var rate: Option[Double] = None
-
- def update(numRows: Long, timeGapMs: Long): Unit = {
- if (timeGapMs > 0) {
- rate = Some(numRows.toDouble * 1000 / timeGapMs)
- } else {
- rate = None
- logDebug(s"Rate updates cannot with zero or negative time gap $timeGapMs")
- }
- }
-
- def currentRate: Double = rate.getOrElse(0.0)
-
- def stop(): Unit = { rate = None }
- }
-
-
- val BATCH_ID = "batchId"
- val IS_TRIGGER_ACTIVE = "isTriggerActive"
- val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
- val STATUS_MESSAGE = "statusMessage"
- val EVENT_TIME_WATERMARK = "eventTimeWatermark"
-
- val START_TIMESTAMP = "timestamp.triggerStart"
- val GET_OFFSET_TIMESTAMP = "timestamp.afterGetOffset"
- val GET_BATCH_TIMESTAMP = "timestamp.afterGetBatch"
- val FINISH_TIMESTAMP = "timestamp.triggerFinish"
-
- val GET_OFFSET_LATENCY = "latency.getOffset.total"
- val GET_BATCH_LATENCY = "latency.getBatch.total"
- val OFFSET_WAL_WRITE_LATENCY = "latency.offsetLogWrite"
- val OPTIMIZER_LATENCY = "latency.optimizer"
- val TRIGGER_LATENCY = "latency.fullTrigger"
- val SOURCE_GET_OFFSET_LATENCY = "latency.getOffset.source"
- val SOURCE_GET_BATCH_LATENCY = "latency.getBatch.source"
-
- val NUM_INPUT_ROWS = "numRows.input.total"
- val NUM_SOURCE_INPUT_ROWS = "numRows.input.source"
- def NUM_TOTAL_STATE_ROWS(aggId: Int): String = s"numRows.state.aggregation$aggId.total"
- def NUM_UPDATED_STATE_ROWS(aggId: Int): String = s"numRows.state.aggregation$aggId.updated"
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 206c08b483..200f0603e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -609,6 +609,12 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val STREAMING_PROGRESS_RETENTION =
+ SQLConfigBuilder("spark.sql.streaming.numRecentProgresses")
+ .doc("The number of progress updates to retain for a streaming query")
+ .intConf
+ .createWithDefault(100)
+
val NDV_MAX_ERROR =
SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
.internal()
@@ -680,6 +686,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
+ def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
+
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
deleted file mode 100644
index ab19602207..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
-
-/**
- * :: Experimental ::
- * Status and metrics of a streaming sink.
- *
- * @param description Description of the source corresponding to this status.
- * @param offsetDesc Description of the current offsets up to which data has been written
- * by the sink.
- * @since 2.0.0
- */
-@Experimental
-class SinkStatus private(
- val description: String,
- val offsetDesc: String) {
-
- /** The compact JSON representation of this status. */
- def json: String = compact(render(jsonValue))
-
- /** The pretty (i.e. indented) JSON representation of this status. */
- def prettyJson: String = pretty(render(jsonValue))
-
- override def toString: String =
- "Status of sink " + indent(prettyString).trim
-
- private[sql] def jsonValue: JValue = {
- ("description" -> JString(description)) ~
- ("offsetDesc" -> JString(offsetDesc))
- }
-
- private[sql] def prettyString: String = {
- s"""$description
- |Committed offsets: $offsetDesc
- |""".stripMargin
- }
-}
-
-/** Companion object, primarily for creating SinkStatus instances internally */
-private[sql] object SinkStatus {
- def apply(desc: String, offsetDesc: String): SinkStatus = new SinkStatus(desc, offsetDesc)
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
deleted file mode 100644
index cfdf11370e..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.{util => ju}
-
-import scala.collection.JavaConverters._
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
-import org.apache.spark.util.JsonProtocol
-
-/**
- * :: Experimental ::
- * Status and metrics of a streaming Source.
- *
- * @param description Description of the source corresponding to this status.
- * @param offsetDesc Description of the current offset if known.
- * @param inputRate Current rate (rows/sec) at which data is being generated by the source.
- * @param processingRate Current rate (rows/sec) at which the query is processing data from
- * the source.
- * @param triggerDetails Low-level details of the currently active trigger (e.g. number of
- * rows processed in trigger, latency of intermediate steps, etc.).
- * If no trigger is active, then it will have details of the last completed
- * trigger.
- * @since 2.0.0
- */
-@Experimental
-class SourceStatus private(
- val description: String,
- val offsetDesc: String,
- val inputRate: Double,
- val processingRate: Double,
- val triggerDetails: ju.Map[String, String]) {
-
- /** The compact JSON representation of this status. */
- def json: String = compact(render(jsonValue))
-
- /** The pretty (i.e. indented) JSON representation of this status. */
- def prettyJson: String = pretty(render(jsonValue))
-
- override def toString: String =
- "Status of source " + indent(prettyString).trim
-
- private[sql] def jsonValue: JValue = {
- ("description" -> JString(description)) ~
- ("offsetDesc" -> JString(offsetDesc)) ~
- ("inputRate" -> JDouble(inputRate)) ~
- ("processingRate" -> JDouble(processingRate)) ~
- ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
- }
-
- private[sql] def prettyString: String = {
- val triggerDetailsLines =
- triggerDetails.asScala.map { case (k, v) => s"$k: $v" }
- s"""$description
- |Available offset: $offsetDesc
- |Input rate: $inputRate rows/sec
- |Processing rate: $processingRate rows/sec
- |Trigger details:
- |""".stripMargin + indent(triggerDetailsLines)
- }
-}
-
-/** Companion object, primarily for creating SourceStatus instances internally */
-private[sql] object SourceStatus {
- def apply(
- desc: String,
- offsetDesc: String,
- inputRate: Double,
- processingRate: Double,
- triggerDetails: Map[String, String]): SourceStatus = {
- new SourceStatus(desc, offsetDesc, inputRate, processingRate, triggerDetails.asJava)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 374313f2ca..8fc4e43b6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.streaming
+import java.util.UUID
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.SparkSession
@@ -33,25 +35,27 @@ trait StreamingQuery {
* Returns the name of the query. This name is unique across all active queries. This can be
* set in the `org.apache.spark.sql.streaming.DataStreamWriter` as
* `dataframe.writeStream.queryName("query").start()`.
+ *
* @since 2.0.0
*/
def name: String
/**
- * Returns the unique id of this query. This id is automatically generated and is unique across
- * all queries that have been started in the current process.
- * @since 2.0.0
+ * Returns the unique id of this query.
+ * @since 2.1.0
*/
- def id: Long
+ def id: UUID
/**
* Returns the `SparkSession` associated with `this`.
+ *
* @since 2.0.0
*/
def sparkSession: SparkSession
/**
- * Whether the query is currently active or not
+ * Returns `true` if this query is actively running.
+ *
* @since 2.0.0
*/
def isActive: Boolean
@@ -64,23 +68,26 @@ trait StreamingQuery {
/**
* Returns the current status of the query.
+ *
* @since 2.0.2
*/
def status: StreamingQueryStatus
/**
- * Returns current status of all the sources.
- * @since 2.0.0
+ * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
+ * The number of progress updates retained for each stream is configured by Spark session
+ * configuration `spark.sql.streaming.numRecentProgresses`.
+ *
+ * @since 2.1.0
*/
- @deprecated("use status.sourceStatuses", "2.0.2")
- def sourceStatuses: Array[SourceStatus]
+ def recentProgresses: Array[StreamingQueryProgress]
/**
- * Returns current status of the sink.
- * @since 2.0.0
+ * Returns the most recent [[StreamingQueryProgress]] update of this streaming query.
+ *
+ * @since 2.1.0
*/
- @deprecated("use status.sinkStatus", "2.0.2")
- def sinkStatus: SinkStatus
+ def lastProgress: StreamingQueryProgress
/**
* Waits for the termination of `this` query, either by `query.stop()` or by an exception.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 0a58142e06..13f11ba1c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
- * @param query Query that caused the exception
+ * @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
* @param startOffset Starting offset (if known) of the range of data in which exception occurred
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 9e311fae84..d9ee75c064 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.streaming
+import java.util.UUID
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.scheduler.SparkListenerEvent
@@ -81,30 +83,28 @@ object StreamingQueryListener {
/**
* :: Experimental ::
* Event representing the start of a query
- * @since 2.0.0
+ * @since 2.1.0
*/
@Experimental
- class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
+ class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event
/**
* :: Experimental ::
- * Event representing any progress updates in a query
- * @since 2.0.0
+ * Event representing any progress updates in a query.
+ * @since 2.1.0
*/
@Experimental
- class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
+ class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event
/**
* :: Experimental ::
- * Event representing that termination of a query
+ * Event representing that termination of a query.
*
- * @param queryStatus Information about the status of the query.
- * @param exception The exception message of the [[StreamingQuery]] if the query was terminated
+ * @param id The query id.
+ * @param exception The exception message of the query if the query was terminated
* with an exception. Otherwise, it will be `None`.
- * @since 2.0.0
+ * @since 2.1.0
*/
@Experimental
- class QueryTerminatedEvent private[sql](
- val queryStatus: StreamingQueryStatus,
- val exception: Option[String]) extends Event
+ class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 53968a82d8..c448468bea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.streaming
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicLong
+
import scala.collection.mutable
import org.apache.hadoop.fs.Path
@@ -41,7 +44,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
- private val activeQueries = new mutable.HashMap[Long, StreamingQuery]
+ private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
private val activeQueriesLock = new Object
private val awaitTerminationLock = new Object
@@ -59,13 +62,20 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
/**
* Returns the query if there is an active query with the given id, or null.
*
- * @since 2.0.0
+ * @since 2.1.0
*/
- def get(id: Long): StreamingQuery = activeQueriesLock.synchronized {
+ def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized {
activeQueries.get(id).orNull
}
/**
+ * Returns the query if there is an active query with the given id, or null.
+ *
+ * @since 2.1.0
+ */
+ def get(id: String): StreamingQuery = get(UUID.fromString(id))
+
+ /**
* Wait until any of the queries on the associated SQLContext has terminated since the
* creation of the context, or since `resetTerminated()` was called. If any query was terminated
* with an exception, then the exception will be thrown.
@@ -197,8 +207,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery = {
activeQueriesLock.synchronized {
- val id = StreamExecution.nextId
- val name = userSpecifiedName.getOrElse(s"query-$id")
+ val name = userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}")
if (activeQueries.values.exists(_.name == name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
@@ -252,7 +261,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
}
val query = new StreamExecution(
sparkSession,
- id,
name,
checkpointLocation,
logicalPlan,
@@ -261,7 +269,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
triggerClock,
outputMode)
query.start()
- activeQueries.put(id, query)
+ activeQueries.put(query.id, query)
query
}
}
@@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
}
}
}
+
+private object StreamingQueryManager {
+ private val _nextId = new AtomicLong(0)
+ private def nextId: Long = _nextId.getAndIncrement()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index ba732ff7fc..4c1a7ce6a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -17,146 +17,17 @@
package org.apache.spark.sql.streaming
-import java.{util => ju}
-
-import scala.collection.JavaConverters._
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{LongOffset, OffsetSeq}
-import org.apache.spark.util.JsonProtocol
-
/**
- * :: Experimental ::
- * A class used to report information about the progress of a [[StreamingQuery]].
+ * Reports information about the instantaneous status of a streaming query.
*
- * @param name Name of the query. This name is unique across all active queries.
- * @param id Id of the query. This id is unique across
- * all queries that have been started in the current process.
- * @param timestamp Timestamp (ms) of when this query was generated.
- * @param inputRate Current rate (rows/sec) at which data is being generated by all the sources.
- * @param processingRate Current rate (rows/sec) at which the query is processing data from
- * all the sources.
- * @param latency Current average latency between the data being available in source and the sink
- * writing the corresponding output.
- * @param sourceStatuses Current statuses of the sources.
- * @param sinkStatus Current status of the sink.
- * @param triggerDetails Low-level details of the currently active trigger (e.g. number of
- * rows processed in trigger, latency of intermediate steps, etc.).
- * If no trigger is active, then it will have details of the last completed
- * trigger.
- * @since 2.0.0
+ * @param message A human readable description of what the stream is currently doing.
+ * @param isDataAvailable True when there is new data to be processed.
+ * @param isTriggerActive True when the trigger is actively firing, false when waiting for the
+ * next trigger time.
+ *
+ * @since 2.1.0
*/
-@Experimental
-class StreamingQueryStatus private(
- val name: String,
- val id: Long,
- val timestamp: Long,
- val inputRate: Double,
- val processingRate: Double,
- val latency: Option[Double],
- val sourceStatuses: Array[SourceStatus],
- val sinkStatus: SinkStatus,
- val triggerDetails: ju.Map[String, String]) {
-
- import StreamingQueryStatus._
-
- /** The compact JSON representation of this status. */
- def json: String = compact(render(jsonValue))
-
- /** The pretty (i.e. indented) JSON representation of this status. */
- def prettyJson: String = pretty(render(jsonValue))
-
- override def toString: String = {
- val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) =>
- s"Source ${i + 1} - " + indent(s.prettyString).trim
- }
- val sinkStatusLines = sinkStatus.prettyString.trim
- val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => s"$k: $v" }.toSeq.sorted
- val numSources = sourceStatuses.length
- val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" else "" }
-
- val allLines =
- s"""|Query id: $id
- |Status timestamp: $timestamp
- |Input rate: $inputRate rows/sec
- |Processing rate $processingRate rows/sec
- |Latency: ${latency.getOrElse("-")} ms
- |Trigger details:
- |${indent(triggerDetailsLines)}
- |Source statuses [$numSourcesString]:
- |${indent(sourceStatusLines)}
- |Sink status - ${indent(sinkStatusLines).trim}""".stripMargin
-
- s"Status of query '$name'\n${indent(allLines)}"
- }
-
- private[sql] def jsonValue: JValue = {
- ("name" -> JString(name)) ~
- ("id" -> JInt(id)) ~
- ("timestamp" -> JInt(timestamp)) ~
- ("inputRate" -> JDouble(inputRate)) ~
- ("processingRate" -> JDouble(processingRate)) ~
- ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
- ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~
- ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
- ("sinkStatus" -> sinkStatus.jsonValue)
- }
-}
-
-/** Companion object, primarily for creating StreamingQueryInfo instances internally */
-private[sql] object StreamingQueryStatus {
- def apply(
- name: String,
- id: Long,
- timestamp: Long,
- inputRate: Double,
- processingRate: Double,
- latency: Option[Double],
- sourceStatuses: Array[SourceStatus],
- sinkStatus: SinkStatus,
- triggerDetails: Map[String, String]): StreamingQueryStatus = {
- new StreamingQueryStatus(name, id, timestamp, inputRate, processingRate,
- latency, sourceStatuses, sinkStatus, triggerDetails.asJava)
- }
-
- def indent(strings: Iterable[String]): String = strings.map(indent).mkString("\n")
- def indent(string: String): String = string.split("\n").map(" " + _).mkString("\n")
-
- /** Create an instance of status for python testing */
- def testStatus(): StreamingQueryStatus = {
- import org.apache.spark.sql.execution.streaming.StreamMetrics._
- StreamingQueryStatus(
- name = "query",
- id = 1,
- timestamp = 123,
- inputRate = 15.5,
- processingRate = 23.5,
- latency = Some(345),
- sourceStatuses = Array(
- SourceStatus(
- desc = "MySource1",
- offsetDesc = LongOffset(0).json,
- inputRate = 15.5,
- processingRate = 23.5,
- triggerDetails = Map(
- NUM_SOURCE_INPUT_ROWS -> "100",
- SOURCE_GET_OFFSET_LATENCY -> "10",
- SOURCE_GET_BATCH_LATENCY -> "20"))),
- sinkStatus = SinkStatus(
- desc = "MySink",
- offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
- triggerDetails = Map(
- BATCH_ID -> "5",
- IS_TRIGGER_ACTIVE -> "true",
- IS_DATA_PRESENT_IN_TRIGGER -> "true",
- GET_OFFSET_LATENCY -> "10",
- GET_BATCH_LATENCY -> "20",
- NUM_INPUT_ROWS -> "100"
- ))
- }
-}
+case class StreamingQueryStatus protected[sql](
+ message: String,
+ isDataAvailable: Boolean,
+ isTriggerActive: Boolean)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
new file mode 100644
index 0000000000..7129fa4d15
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.jute.compiler.JLong
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Experimental
+class StateOperatorProgress private[sql](
+ val numRowsTotal: Long,
+ val numRowsUpdated: Long) {
+ private[sql] def jsonValue: JValue = {
+ ("numRowsTotal" -> JInt(numRowsTotal)) ~
+ ("numRowsUpdated" -> JInt(numRowsUpdated))
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made in the execution of a [[StreamingQuery]] during
+ * a trigger. Each event relates to processing done for a single trigger of the streaming
+ * query. Events are emitted even when no new data is available to be processed.
+ *
+ * @param id A unique id of the query.
+ * @param name Name of the query. This name is unique across all active queries.
+ * @param timestamp Timestamp (ms) of the beginning of the trigger.
+ * @param batchId A unique id for the current batch of data being processed. Note that in the
+ * case of retries after a failure a given batchId my be executed more than once.
+ * Similarly, when there is no data to be processed, the batchId will not be
+ * incremented.
+ * @param durationMs The amount of time taken to perform various operations in milliseconds.
+ * @param currentWatermark The current event time watermark in milliseconds
+ * @param stateOperators Information about operators in the query that store state.
+ * @param sources detailed statistics on data being read from each of the streaming sources.
+ * @since 2.1.0
+ */
+@Experimental
+class StreamingQueryProgress private[sql](
+ val id: UUID,
+ val name: String,
+ val timestamp: Long,
+ val batchId: Long,
+ val durationMs: ju.Map[String, java.lang.Long],
+ val currentWatermark: Long,
+ val stateOperators: Array[StateOperatorProgress],
+ val sources: Array[SourceProgress],
+ val sink: SinkProgress) {
+
+ /** The aggregate (across all sources) number of records processed in a trigger. */
+ def numInputRows: Long = sources.map(_.numInputRows).sum
+
+ /** The aggregate (across all sources) rate of data arriving. */
+ def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
+
+ /** The aggregate (across all sources) rate at which Spark is processing data. */
+ def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum
+
+ /** The compact JSON representation of this status. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this status. */
+ def prettyJson: String = pretty(render(jsonValue))
+
+ override def toString: String = prettyJson
+
+ private[sql] def jsonValue: JValue = {
+ def safeDoubleToJValue(value: Double): JValue = {
+ if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+ }
+
+ ("id" -> JString(id.toString)) ~
+ ("name" -> JString(name)) ~
+ ("timestamp" -> JInt(timestamp)) ~
+ ("numInputRows" -> JInt(numInputRows)) ~
+ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
+ ("durationMs" -> durationMs
+ .asScala
+ .map { case (k, v) => k -> JInt(v.toLong): JObject }
+ .reduce(_ ~ _)) ~
+ ("currentWatermark" -> JInt(currentWatermark)) ~
+ ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
+ ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
+ ("sink" -> sink.jsonValue)
+
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made for a source in the execution of a [[StreamingQuery]]
+ * during a trigger. See [[StreamingQueryProgress]] for more information.
+ *
+ * @param description Description of the source.
+ * @param startOffset The starting offset for data being read.
+ * @param endOffset The ending offset for data being read.
+ * @param numInputRows The number of records read from this source.
+ * @param inputRowsPerSecond The rate at which data is arriving from this source.
+ * @param processedRowsPerSecond The rate at which data from this source is being procressed by
+ * Spark.
+ * @since 2.1.0
+ */
+@Experimental
+class SourceProgress protected[sql](
+ val description: String,
+ val startOffset: String,
+ val endOffset: String,
+ val numInputRows: Long,
+ val inputRowsPerSecond: Double,
+ val processedRowsPerSecond: Double) {
+
+ /** The compact JSON representation of this progress. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this progress. */
+ def prettyJson: String = pretty(render(jsonValue))
+
+ override def toString: String = prettyJson
+
+ private[sql] def jsonValue: JValue = {
+ def safeDoubleToJValue(value: Double): JValue = {
+ if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+ }
+
+ ("description" -> JString(description)) ~
+ ("startOffset" -> tryParse(startOffset)) ~
+ ("endOffset" -> tryParse(endOffset)) ~
+ ("numInputRows" -> JInt(numInputRows)) ~
+ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
+ }
+
+ private def tryParse(json: String) = try {
+ parse(json)
+ } catch {
+ case NonFatal(e) => JString(json)
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made for a sink in the execution of a [[StreamingQuery]]
+ * during a trigger. See [[StreamingQueryProgress]] for more information.
+ *
+ * @param description Description of the source corresponding to this status.
+ * @since 2.1.0
+ */
+@Experimental
+class SinkProgress protected[sql](
+ val description: String) {
+
+ /** The compact JSON representation of this status. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this status. */
+ def prettyJson: String = pretty(render(jsonValue))
+
+ override def toString: String = prettyJson
+
+ private[sql] def jsonValue: JValue = {
+ ("description" -> JString(description))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
deleted file mode 100644
index 38c4ece439..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.scalactic.TolerantNumerics
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.util.ManualClock
-
-class StreamMetricsSuite extends SparkFunSuite {
- import StreamMetrics._
-
- // To make === between double tolerate inexact values
- implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
-
- test("rates, latencies, trigger details - basic life cycle") {
- val sm = newStreamMetrics(source)
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 0.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 0.0)
- assert(sm.currentLatency() === None)
- assert(sm.currentTriggerDetails().isEmpty)
-
- // When trigger started, the rates should not change, but should return
- // reported trigger details
- sm.reportTriggerStarted(1)
- sm.reportTriggerDetail("key", "value")
- sm.reportSourceTriggerDetail(source, "key2", "value2")
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 0.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 0.0)
- assert(sm.currentLatency() === None)
- assert(sm.currentTriggerDetails() ===
- Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
- START_TIMESTAMP -> "0", "key" -> "value"))
- assert(sm.currentSourceTriggerDetails(source) ===
- Map(BATCH_ID -> "1", "key2" -> "value2"))
-
- // Finishing the trigger should calculate the rates, except input rate which needs
- // to have another trigger interval
- sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows, 10 output rows
- clock.advance(1000)
- sm.reportTriggerFinished()
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 100.0) // 100 input rows processed in 1 sec
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 100.0)
- assert(sm.currentLatency() === None)
- assert(sm.currentTriggerDetails() ===
- Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
- START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
- NUM_INPUT_ROWS -> "100", "key" -> "value"))
- assert(sm.currentSourceTriggerDetails(source) ===
- Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
-
- // After another trigger starts, the rates and latencies should not change until
- // new rows are reported
- clock.advance(1000)
- sm.reportTriggerStarted(2)
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 100.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 100.0)
- assert(sm.currentLatency() === None)
-
- // Reporting new rows should update the rates and latencies
- sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
- clock.advance(500)
- sm.reportTriggerFinished()
- assert(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts
- assert(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec
- assert(sm.currentSourceInputRate(source) === 100.0)
- assert(sm.currentSourceProcessingRate(source) === 400.0)
- assert(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
-
- // Rates should be set to 0 after stop
- sm.stop()
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 0.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 0.0)
- assert(sm.currentLatency() === None)
- assert(sm.currentTriggerDetails().isEmpty)
- }
-
- test("rates and latencies - after trigger with no data") {
- val sm = newStreamMetrics(source)
- // Trigger 1 with data
- sm.reportTriggerStarted(1)
- sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows
- clock.advance(1000)
- sm.reportTriggerFinished()
-
- // Trigger 2 with data
- clock.advance(1000)
- sm.reportTriggerStarted(2)
- sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
- clock.advance(500)
- sm.reportTriggerFinished()
-
- // Make sure that all rates are set
- require(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts
- require(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec
- require(sm.currentSourceInputRate(source) === 100.0)
- require(sm.currentSourceProcessingRate(source) === 400.0)
- require(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
-
- // Trigger 3 with data
- clock.advance(500)
- sm.reportTriggerStarted(3)
- clock.advance(500)
- sm.reportTriggerFinished()
-
- // Rates are set to zero and latency is set to None
- assert(sm.currentInputRate() === 0.0)
- assert(sm.currentProcessingRate() === 0.0)
- assert(sm.currentSourceInputRate(source) === 0.0)
- assert(sm.currentSourceProcessingRate(source) === 0.0)
- assert(sm.currentLatency() === None)
- sm.stop()
- }
-
- test("rates - after trigger with multiple sources, and one source having no info") {
- val source1 = TestSource(1)
- val source2 = TestSource(2)
- val sm = newStreamMetrics(source1, source2)
- // Trigger 1 with data
- sm.reportTriggerStarted(1)
- sm.reportNumInputRows(Map(source1 -> 100L, source2 -> 100L))
- clock.advance(1000)
- sm.reportTriggerFinished()
-
- // Trigger 2 with data
- clock.advance(1000)
- sm.reportTriggerStarted(2)
- sm.reportNumInputRows(Map(source1 -> 200L, source2 -> 200L))
- clock.advance(500)
- sm.reportTriggerFinished()
-
- // Make sure that all rates are set
- assert(sm.currentInputRate() === 200.0) // 200*2 input rows generated in 2 seconds b/w starts
- assert(sm.currentProcessingRate() === 800.0) // 200*2 output rows processed in 0.5 sec
- assert(sm.currentSourceInputRate(source1) === 100.0)
- assert(sm.currentSourceInputRate(source2) === 100.0)
- assert(sm.currentSourceProcessingRate(source1) === 400.0)
- assert(sm.currentSourceProcessingRate(source2) === 400.0)
-
- // Trigger 3 with data
- clock.advance(500)
- sm.reportTriggerStarted(3)
- clock.advance(500)
- sm.reportNumInputRows(Map(source1 -> 200L))
- sm.reportTriggerFinished()
-
- // Rates are set to zero and latency is set to None
- assert(sm.currentInputRate() === 200.0)
- assert(sm.currentProcessingRate() === 400.0)
- assert(sm.currentSourceInputRate(source1) === 200.0)
- assert(sm.currentSourceInputRate(source2) === 0.0)
- assert(sm.currentSourceProcessingRate(source1) === 400.0)
- assert(sm.currentSourceProcessingRate(source2) === 0.0)
- sm.stop()
- }
-
- test("registered Codahale metrics") {
- import scala.collection.JavaConverters._
- val sm = newStreamMetrics(source)
- val gaugeNames = sm.metricRegistry.getGauges().keySet().asScala
-
- // so that all metrics are considered as a single metric group in Ganglia
- assert(!gaugeNames.exists(_.contains(".")))
- assert(gaugeNames === Set(
- "inputRate-total",
- "inputRate-source0",
- "processingRate-total",
- "processingRate-source0",
- "latency"))
- }
-
- private def newStreamMetrics(sources: Source*): StreamMetrics = {
- new StreamMetrics(sources.toSet, clock, "test")
- }
-
- private val clock = new ManualClock()
- private val source = TestSource(0)
-
- case class TestSource(id: Int) extends Source {
- override def schema: StructType = StructType(Array.empty[StructField])
- override def getOffset: Option[Offset] = Some(new LongOffset(0))
- override def getBatch(start: Option[Offset], end: Offset): DataFrame = { null }
- override def stop() {}
- override def toString(): String = s"source$id"
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bad6642ea4..8256c63d87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1006,9 +1006,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
testStream(input)(
AddTextFileData("100", src, tmp),
CheckAnswer("100"),
- AssertOnLastQueryStatus { status =>
- assert(status.triggerDetails.get("numRows.input.total") === "1")
- assert(status.sourceStatuses(0).processingRate > 0.0)
+ AssertOnQuery { query =>
+ val actualProgress = query.recentProgresses
+ .find(_.numInputRows > 0)
+ .getOrElse(sys.error("Could not find records with data."))
+ assert(actualProgress.numInputRows === 1)
+ assert(actualProgress.sources(0).processedRowsPerSecond > 0.0)
+ true
}
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a6b2d4b9ab..a2629f7f68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -28,7 +28,6 @@ import scala.util.control.NonFatal
import org.scalatest.Assertions
import org.scalatest.concurrent.{Eventually, Timeouts}
-import org.scalatest.concurrent.AsyncAssertions.Waiter
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.exceptions.TestFailedDueToTimeoutException
@@ -202,10 +201,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
}
- case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
- extends StreamAction
-
- class StreamManualClock(time: Long = 0L) extends ManualClock(time) {
+ class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
private var waitStartTime: Option[Long] = None
override def waitTillTime(targetTime: Long): Long = synchronized {
@@ -325,10 +321,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
val testThread = Thread.currentThread()
val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
- val statusCollector = new QueryStatusCollector
var manualClockExpectedTime = -1L
try {
- spark.streams.addListener(statusCollector)
startedTest.foreach { action =>
logInfo(s"Processing test stream action: $action")
action match {
@@ -375,10 +369,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
assert(manualClockExpectedTime >= 0)
+
// Make sure we don't advance ManualClock too early. See SPARK-16002.
eventually("StreamManualClock has not yet entered the waiting state") {
assert(clock.isStreamWaitingAt(manualClockExpectedTime))
}
+
clock.advance(timeToAdd)
manualClockExpectedTime += timeToAdd
verify(clock.getTimeMillis() === manualClockExpectedTime,
@@ -447,13 +443,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
val streamToAssert = Option(currentStream).getOrElse(lastStream)
verify({ a.run(); true }, s"Assert failed: ${a.message}")
- case a: AssertOnLastQueryStatus =>
- Eventually.eventually(timeout(streamingTimeout)) {
- require(statusCollector.lastTriggerStatus.nonEmpty)
- }
- val status = statusCollector.lastTriggerStatus.get
- verify({ a.condition(status); true }, "Assert on last query status failed")
-
case a: AddData =>
try {
// Add data and get the source where it was added, and the expected offset of the
@@ -528,7 +517,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
if (currentStream != null && currentStream.microBatchThread.isAlive) {
currentStream.stop()
}
- spark.streams.removeListener(statusCollector)
// Rollback prev configuration values
resetConfValues.foreach {
@@ -614,7 +602,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
testStream(ds)(actions: _*)
}
-
object AwaitTerminationTester {
trait ExpectedBehavior
@@ -668,58 +655,4 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
}
}
-
-
- class QueryStatusCollector extends StreamingQueryListener {
- // to catch errors in the async listener events
- @volatile private var asyncTestWaiter = new Waiter
-
- @volatile var startStatus: StreamingQueryStatus = null
- @volatile var terminationStatus: StreamingQueryStatus = null
- @volatile var terminationException: Option[String] = null
-
- private val progressStatuses = new mutable.ArrayBuffer[StreamingQueryStatus]
-
- /** Get the info of the last trigger that processed data */
- def lastTriggerStatus: Option[StreamingQueryStatus] = synchronized {
- progressStatuses.filter { i =>
- i.triggerDetails.get("isTriggerActive").toBoolean == false &&
- i.triggerDetails.get("isDataPresentInTrigger").toBoolean == true
- }.lastOption
- }
-
- def reset(): Unit = {
- startStatus = null
- terminationStatus = null
- progressStatuses.clear()
- asyncTestWaiter = new Waiter
- }
-
- def checkAsyncErrors(): Unit = {
- asyncTestWaiter.await(timeout(10 seconds))
- }
-
-
- override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
- asyncTestWaiter {
- startStatus = queryStarted.queryStatus
- }
- }
-
- override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
- asyncTestWaiter {
- assert(startStatus != null, "onQueryProgress called before onQueryStarted")
- synchronized { progressStatuses += queryProgress.queryStatus }
- }
- }
-
- override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
- asyncTestWaiter {
- assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
- terminationStatus = queryTerminated.queryStatus
- terminationException = queryTerminated.exception
- }
- asyncTestWaiter.dismiss()
- }
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 98f3bec708..c68f953b10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -17,24 +17,26 @@
package org.apache.spark.sql.streaming
+import java.util.UUID
+
import scala.collection.mutable
import org.scalactic.TolerantNumerics
+import org.scalatest.concurrent.AsyncAssertions.Waiter
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester._
import org.apache.spark.SparkException
import org.apache.spark.scheduler._
-import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.functions._
-import org.apache.spark.util.{JsonProtocol, ManualClock}
-
+import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.util.JsonProtocol
class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
import testImplicits._
- import StreamingQueryListenerSuite._
// To make === between double tolerate inexact values
implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
@@ -46,86 +48,86 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Make sure we don't leak any events to the next test
}
- test("single listener, check trigger statuses") {
- import StreamingQueryListenerSuite._
- clock = new StreamManualClock
-
- /** Custom MemoryStream that waits for manual clock to reach a time */
- val inputData = new MemoryStream[Int](0, sqlContext) {
- // Wait for manual clock to be 100 first time there is data
- override def getOffset: Option[Offset] = {
- val offset = super.getOffset
- if (offset.nonEmpty) {
- clock.waitTillTime(100)
+ testQuietly("single listener, check trigger events are generated correctly") {
+ val clock = new StreamManualClock
+ val inputData = new MemoryStream[Int](0, sqlContext)
+ val df = inputData.toDS().as[Long].map { 10 / _ }
+ val listener = new EventCollector
+ try {
+ // No events until started
+ spark.streams.addListener(listener)
+ assert(listener.startEvent === null)
+ assert(listener.progressEvents.isEmpty)
+ assert(listener.terminationEvent === null)
+
+ testStream(df, OutputMode.Append)(
+
+ // Start event generated when query started
+ StartStream(ProcessingTime(100), triggerClock = clock),
+ AssertOnQuery { query =>
+ assert(listener.startEvent !== null)
+ assert(listener.startEvent.id === query.id)
+ assert(listener.startEvent.name === query.name)
+ assert(listener.progressEvents.isEmpty)
+ assert(listener.terminationEvent === null)
+ true
+ },
+
+ // Progress event generated when data processed
+ AddData(inputData, 1, 2),
+ AdvanceManualClock(100),
+ CheckAnswer(10, 5),
+ AssertOnQuery { query =>
+ assert(listener.progressEvents.nonEmpty)
+ assert(listener.progressEvents.last.json === query.lastProgress.json)
+ assert(listener.terminationEvent === null)
+ true
+ },
+
+ // Termination event generated when stopped cleanly
+ StopStream,
+ AssertOnQuery { query =>
+ eventually(Timeout(streamingTimeout)) {
+ assert(listener.terminationEvent !== null)
+ assert(listener.terminationEvent.id === query.id)
+ assert(listener.terminationEvent.exception === None)
+ }
+ listener.checkAsyncErrors()
+ listener.reset()
+ true
+ },
+
+ // Termination event generated with exception message when stopped with error
+ StartStream(ProcessingTime(100), triggerClock = clock),
+ AddData(inputData, 0),
+ AdvanceManualClock(100),
+ ExpectFailure[SparkException],
+ AssertOnQuery { query =>
+ assert(listener.terminationEvent !== null)
+ assert(listener.terminationEvent.id === query.id)
+ assert(listener.terminationEvent.exception.nonEmpty)
+ listener.checkAsyncErrors()
+ true
}
- offset
- }
-
- // Wait for manual clock to be 300 first time there is data
- override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- clock.waitTillTime(300)
- super.getBatch(start, end)
- }
- }
-
- // This is to make sure thatquery waits for manual clock to be 600 first time there is data
- val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
- clock.waitTillTime(600)
- x
+ )
+ } finally {
+ spark.streams.removeListener(listener)
}
-
- testStream(mapped, OutputMode.Complete)(
- StartStream(triggerClock = clock),
- AddData(inputData, 1, 2),
- AdvanceManualClock(100), // unblock getOffset, will block on getBatch
- AdvanceManualClock(200), // unblock getBatch, will block on computation
- AdvanceManualClock(300), // unblock computation
- AssertOnQuery { _ => clock.getTimeMillis() === 600 },
- AssertOnLastQueryStatus { status: StreamingQueryStatus =>
- // Check the correctness of the trigger info of the last completed batch reported by
- // onQueryProgress
- assert(status.triggerDetails.containsKey("batchId"))
- assert(status.triggerDetails.get("isTriggerActive") === "false")
- assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
-
- assert(status.triggerDetails.get("timestamp.triggerStart") === "0")
- assert(status.triggerDetails.get("timestamp.afterGetOffset") === "100")
- assert(status.triggerDetails.get("timestamp.afterGetBatch") === "300")
- assert(status.triggerDetails.get("timestamp.triggerFinish") === "600")
-
- assert(status.triggerDetails.get("latency.getOffset.total") === "100")
- assert(status.triggerDetails.get("latency.getBatch.total") === "200")
- assert(status.triggerDetails.get("latency.optimizer") === "0")
- assert(status.triggerDetails.get("latency.offsetLogWrite") === "0")
- assert(status.triggerDetails.get("latency.fullTrigger") === "600")
-
- assert(status.triggerDetails.get("numRows.input.total") === "2")
- assert(status.triggerDetails.get("numRows.state.aggregation1.total") === "1")
- assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")
-
- assert(status.sourceStatuses.length === 1)
- assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId"))
- assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
- assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
- assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")
- },
- CheckAnswer(2)
- )
}
test("adding and removing listener") {
- def isListenerActive(listener: QueryStatusCollector): Boolean = {
+ def isListenerActive(listener: EventCollector): Boolean = {
listener.reset()
testStream(MemoryStream[Int].toDS)(
StartStream(),
StopStream
)
- listener.startStatus != null
+ listener.startEvent != null
}
try {
- val listener1 = new QueryStatusCollector
- val listener2 = new QueryStatusCollector
+ val listener1 = new EventCollector
+ val listener2 = new EventCollector
spark.streams.addListener(listener1)
assert(isListenerActive(listener1) === true)
@@ -142,14 +144,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
test("event ordering") {
- val listener = new QueryStatusCollector
+ val listener = new EventCollector
withListenerAdded(listener) {
for (i <- 1 to 100) {
listener.reset()
- require(listener.startStatus === null)
+ require(listener.startEvent === null)
testStream(MemoryStream[Int].toDS)(
StartStream(),
- Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
+ Assert(listener.startEvent !== null, "onQueryStarted not called before query returned"),
StopStream,
Assert { listener.checkAsyncErrors() }
)
@@ -158,7 +160,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
testQuietly("exception should be reported in QueryTerminated") {
- val listener = new QueryStatusCollector
+ val listener = new EventCollector
withListenerAdded(listener) {
val input = MemoryStream[Int]
testStream(input.toDS.map(_ / 0))(
@@ -167,49 +169,46 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
ExpectFailure[SparkException](),
Assert {
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
- assert(listener.terminationStatus !== null)
- assert(listener.terminationException.isDefined)
+ assert(listener.terminationEvent !== null)
+ assert(listener.terminationEvent.exception.nonEmpty)
// Make sure that the exception message reported through listener
// contains the actual exception and relevant stack trace
- assert(!listener.terminationException.get.contains("StreamingQueryException"))
- assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
- assert(listener.terminationException.get.contains("StreamingQueryListenerSuite"))
+ assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+ assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+ assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
}
)
}
}
- test("QueryStarted serialization") {
- val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
+ test("QueryStartedEvent serialization") {
+ val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
val json = JsonProtocol.sparkEventToJson(queryStarted)
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryStartedEvent]
- assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus)
}
- test("QueryProgress serialization") {
- val queryProcess = new StreamingQueryListener.QueryProgressEvent(
- StreamingQueryStatus.testStatus)
- val json = JsonProtocol.sparkEventToJson(queryProcess)
- val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
+ test("QueryProgressEvent serialization") {
+ val event = new StreamingQueryListener.QueryProgressEvent(
+ StreamingQueryProgressSuite.testProgress)
+ val json = JsonProtocol.sparkEventToJson(event)
+ val newEvent = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
- assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus)
+ assert(event.progress.json === newEvent.progress.json)
}
- test("QueryTerminated serialization") {
+ test("QueryTerminatedEvent serialization") {
val exception = new RuntimeException("exception")
val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
- StreamingQueryStatus.testStatus,
- Some(exception.getMessage))
- val json =
- JsonProtocol.sparkEventToJson(queryQueryTerminated)
+ UUID.randomUUID, Some(exception.getMessage))
+ val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
- assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus)
+ assert(queryQueryTerminated.id === newQueryTerminated.id)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}
- test("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
+ testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
// query-event-logs-version-2.0.0.txt has all types of events generated by
// Structured Streaming in Spark 2.0.0.
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
@@ -217,7 +216,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
}
- test("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
+ testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
// query-event-logs-version-2.0.1.txt has all types of events generated by
// Structured Streaming in Spark 2.0.1.
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
@@ -248,28 +247,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
- private def assertStreamingQueryInfoEquals(
- expected: StreamingQueryStatus,
- actual: StreamingQueryStatus): Unit = {
- assert(expected.name === actual.name)
- assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
- expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
- case (expectedSource, actualSource) =>
- assertSourceStatus(expectedSource, actualSource)
- }
- assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
- }
-
- private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = {
- assert(expected.description === actual.description)
- assert(expected.offsetDesc === actual.offsetDesc)
- }
-
- private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = {
- assert(expected.description === actual.description)
- assert(expected.offsetDesc === actual.offsetDesc)
- }
-
private def withListenerAdded(listener: StreamingQueryListener)(body: => Unit): Unit = {
try {
failAfter(streamingTimeout) {
@@ -287,9 +264,51 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
val listenerBus = spark.streams invokePrivate listenerBusMethod()
listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
}
-}
-object StreamingQueryListenerSuite {
- // Singleton reference to clock that does not get serialized in task closures
- @volatile var clock: ManualClock = null
+ /** Collects events from the StreamingQueryListener for testing */
+ class EventCollector extends StreamingQueryListener {
+ // to catch errors in the async listener events
+ @volatile private var asyncTestWaiter = new Waiter
+
+ @volatile var startEvent: QueryStartedEvent = null
+ @volatile var terminationEvent: QueryTerminatedEvent = null
+
+ private val _progressEvents = new mutable.Queue[StreamingQueryProgress]
+
+ def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
+ _progressEvents.filter(_.numInputRows > 0)
+ }
+
+ def reset(): Unit = {
+ startEvent = null
+ terminationEvent = null
+ _progressEvents.clear()
+ asyncTestWaiter = new Waiter
+ }
+
+ def checkAsyncErrors(): Unit = {
+ asyncTestWaiter.await(timeout(streamingTimeout))
+ }
+
+ override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+ asyncTestWaiter {
+ startEvent = queryStarted
+ }
+ }
+
+ override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
+ asyncTestWaiter {
+ assert(startEvent != null, "onQueryProgress called before onQueryStarted")
+ _progressEvents.synchronized { _progressEvents += queryProgress.progress }
+ }
+ }
+
+ override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
+ asyncTestWaiter {
+ assert(startEvent != null, "onQueryTerminated called before onQueryStarted")
+ terminationEvent = queryTerminated
+ }
+ asyncTestWaiter.dismiss()
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 41ffd56cf1..268b8ff7b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -62,7 +62,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
assert(spark.streams.get(q1.id).eq(q1))
assert(spark.streams.get(q2.id).eq(q2))
assert(spark.streams.get(q3.id).eq(q3))
- assert(spark.streams.get(-1) === null) // non-existent id
+ assert(spark.streams.get(java.util.UUID.randomUUID()) === null) // non-existent id
q1.stop()
assert(spark.streams.active.toSet === Set(q2, q3))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
new file mode 100644
index 0000000000..45d29f6b35
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._
+
+
+class StreamingQueryProgressSuite extends SparkFunSuite {
+
+ test("prettyJson") {
+ val json = testProgress.prettyJson
+ assert(json ===
+ s"""
+ |{
+ | "id" : "${testProgress.id.toString}",
+ | "name" : "name",
+ | "timestamp" : 1,
+ | "numInputRows" : 678,
+ | "inputRowsPerSecond" : 10.0,
+ | "durationMs" : {
+ | "total" : 0
+ | },
+ | "currentWatermark" : 3,
+ | "stateOperators" : [ {
+ | "numRowsTotal" : 0,
+ | "numRowsUpdated" : 1
+ | } ],
+ | "sources" : [ {
+ | "description" : "source",
+ | "startOffset" : 123,
+ | "endOffset" : 456,
+ | "numInputRows" : 678,
+ | "inputRowsPerSecond" : 10.0
+ | } ],
+ | "sink" : {
+ | "description" : "sink"
+ | }
+ |}
+ """.stripMargin.trim)
+ assert(compact(parse(json)) === testProgress.json)
+
+ }
+
+ test("json") {
+ assert(compact(parse(testProgress.json)) === testProgress.json)
+ }
+
+ test("toString") {
+ assert(testProgress.toString === testProgress.prettyJson)
+ }
+}
+
+object StreamingQueryProgressSuite {
+ val testProgress = new StreamingQueryProgress(
+ id = UUID.randomUUID(),
+ name = "name",
+ timestamp = 1L,
+ batchId = 2L,
+ durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
+ currentWatermark = 3L,
+ stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
+ sources = Array(
+ new SourceProgress(
+ description = "source",
+ startOffset = "123",
+ endOffset = "456",
+ numInputRows = 678,
+ inputRowsPerSecond = 10.0,
+ processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json
+ )
+ ),
+ sink = new SinkProgress("sink")
+ )
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
deleted file mode 100644
index 50a7d92ede..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.SparkFunSuite
-
-class StreamingQueryStatusSuite extends SparkFunSuite {
- test("toString") {
- assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
- """
- |Status of source MySource1
- | Available offset: 0
- | Input rate: 15.5 rows/sec
- | Processing rate: 23.5 rows/sec
- | Trigger details:
- | numRows.input.source: 100
- | latency.getOffset.source: 10
- | latency.getBatch.source: 20
- """.stripMargin.trim, "SourceStatus.toString does not match")
-
- assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
- """
- |Status of sink MySink
- | Committed offsets: [1, -]
- """.stripMargin.trim, "SinkStatus.toString does not match")
-
- assert(StreamingQueryStatus.testStatus.toString ===
- """
- |Status of query 'query'
- | Query id: 1
- | Status timestamp: 123
- | Input rate: 15.5 rows/sec
- | Processing rate 23.5 rows/sec
- | Latency: 345.0 ms
- | Trigger details:
- | batchId: 5
- | isDataPresentInTrigger: true
- | isTriggerActive: true
- | latency.getBatch.total: 20
- | latency.getOffset.total: 10
- | numRows.input.total: 100
- | Source statuses [1 source]:
- | Source 1 - MySource1
- | Available offset: 0
- | Input rate: 15.5 rows/sec
- | Processing rate: 23.5 rows/sec
- | Trigger details:
- | numRows.input.source: 100
- | latency.getOffset.source: 10
- | latency.getBatch.source: 20
- | Sink status - MySink
- | Committed offsets: [1, -]
- """.stripMargin.trim, "StreamingQueryStatus.toString does not match")
-
- }
-
- test("json") {
- assert(StreamingQueryStatus.testStatus.json ===
- """
- |{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5,
- |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20",
- |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5",
- |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"},
- |"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
- |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
- |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
- |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
- """.stripMargin.replace("\n", "").trim)
- }
-
- test("prettyJson") {
- assert(
- StreamingQueryStatus.testStatus.prettyJson ===
- """
- |{
- | "name" : "query",
- | "id" : 1,
- | "timestamp" : 123,
- | "inputRate" : 15.5,
- | "processingRate" : 23.5,
- | "latency" : 345.0,
- | "triggerDetails" : {
- | "latency.getBatch.total" : "20",
- | "numRows.input.total" : "100",
- | "isTriggerActive" : "true",
- | "batchId" : "5",
- | "latency.getOffset.total" : "10",
- | "isDataPresentInTrigger" : "true"
- | },
- | "sourceStatuses" : [ {
- | "description" : "MySource1",
- | "offsetDesc" : "0",
- | "inputRate" : 15.5,
- | "processingRate" : 23.5,
- | "triggerDetails" : {
- | "numRows.input.source" : "100",
- | "latency.getOffset.source" : "10",
- | "latency.getBatch.source" : "20"
- | }
- | } ],
- | "sinkStatus" : {
- | "description" : "MySink",
- | "offsetDesc" : "[1, -]"
- | }
- |}
- """.stripMargin.trim)
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 8ecb33cf9d..4f3b4a2d75 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -20,14 +20,15 @@ package org.apache.spark.sql.streaming
import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.types.StructType
import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{ManualClock, Utils}
class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
@@ -109,85 +110,139 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
)
}
- testQuietly("query statuses") {
- val inputData = MemoryStream[Int]
- val mapped = inputData.toDS().map(6 / _)
- testStream(mapped)(
- AssertOnQuery(q => q.status.name === q.name),
- AssertOnQuery(q => q.status.id === q.id),
- AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
- AssertOnQuery(_.status.inputRate === 0.0),
- AssertOnQuery(_.status.processingRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === "-"),
- AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.status.sinkStatus.offsetDesc === OffsetSeq(None :: Nil).toString),
- AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"),
- AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.sinkStatus.offsetDesc === new OffsetSeq(None :: Nil).toString),
+ testQuietly("query statuses and progresses") {
+ import StreamingQuerySuite._
+ clock = new StreamManualClock
+
+ /** Custom MemoryStream that waits for manual clock to reach a time */
+ val inputData = new MemoryStream[Int](0, sqlContext) {
+ // Wait for manual clock to be 100 first time there is data
+ override def getOffset: Option[Offset] = {
+ val offset = super.getOffset
+ if (offset.nonEmpty) {
+ clock.waitTillTime(300)
+ }
+ offset
+ }
- AddData(inputData, 1, 2),
- CheckAnswer(6, 3),
- AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
- AssertOnQuery(_.status.inputRate >= 0.0),
- AssertOnQuery(_.status.processingRate >= 0.0),
- AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).json),
- AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0),
- AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0),
- AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- OffsetSeq.fill(LongOffset(0)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).json),
- AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0),
- AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(0)).toString),
+ // Wait for manual clock to be 300 first time there is data
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ clock.waitTillTime(600)
+ super.getBatch(start, end)
+ }
+ }
- AddData(inputData, 1, 2),
- CheckAnswer(6, 3, 6, 3),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
- AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- OffsetSeq.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
- AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
+ // This is to make sure thatquery waits for manual clock to be 600 first time there is data
+ val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
+ clock.waitTillTime(1100)
+ x
+ }
- StopStream,
- AssertOnQuery(_.status.inputRate === 0.0),
- AssertOnQuery(_.status.processingRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
- AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- OffsetSeq.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
- AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
- AssertOnQuery(_.status.triggerDetails.isEmpty),
+ case class AssertStreamExecThreadToWaitForClock()
+ extends AssertOnQuery(q => {
+ eventually(Timeout(streamingTimeout)) {
+ if (q.exception.isEmpty) {
+ assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }, "")
+
+ testStream(mapped, OutputMode.Complete)(
+ StartStream(ProcessingTime(100), triggerClock = clock),
+ AssertStreamExecThreadToWaitForClock(),
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ // TODO: test status.message before trigger has started
+ // AssertOnQuery(_.lastProgress === null) // there is an empty trigger as soon as started
+ AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+ // Test status while offset is being fetched
+ AddData(inputData, 1, 2),
+ AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset
+ AssertStreamExecThreadToWaitForClock(),
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === true),
+ AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")),
+ AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+ // Test status while batch is being fetched
+ AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
+ AssertStreamExecThreadToWaitForClock(),
+ AssertOnQuery(_.status.isDataAvailable === true),
+ AssertOnQuery(_.status.isTriggerActive === true),
+ AssertOnQuery(_.status.message === "Processing new data"),
+ AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+ // Test status while batch is being processed
+ AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
+ AssertOnQuery(_.status.isDataAvailable === true),
+ AssertOnQuery(_.status.isTriggerActive === true),
+ AssertOnQuery(_.status.message === "Processing new data"),
+ AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+ // Test status while batch processing has completed
+ AdvanceManualClock(500), // time = 1100 to unblock job
+ AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
+ CheckAnswer(2),
+ AssertOnQuery(_.status.isDataAvailable === true),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message === "Waiting for next trigger"),
+ AssertOnQuery { query =>
+ assert(query.lastProgress != null)
+ assert(query.recentProgresses.exists(_.numInputRows > 0))
+ assert(query.recentProgresses.last.eq(query.lastProgress))
+
+ val progress = query.lastProgress
+ assert(progress.id === query.id)
+ assert(progress.name === query.name)
+ assert(progress.batchId === 0)
+ assert(progress.timestamp === 100)
+ assert(progress.numInputRows === 2)
+ assert(progress.processedRowsPerSecond === 2.0)
+
+ assert(progress.durationMs.get("getOffset") === 200)
+ assert(progress.durationMs.get("getBatch") === 300)
+ assert(progress.durationMs.get("queryPlanning") === 0)
+ assert(progress.durationMs.get("walCommit") === 0)
+ assert(progress.durationMs.get("triggerExecution") === 1000)
+
+ assert(progress.sources.length === 1)
+ assert(progress.sources(0).description contains "MemoryStream")
+ assert(progress.sources(0).startOffset === null)
+ assert(progress.sources(0).endOffset !== null)
+ assert(progress.sources(0).processedRowsPerSecond === 2.0)
+
+ assert(progress.stateOperators.length === 1)
+ assert(progress.stateOperators(0).numRowsUpdated === 1)
+ assert(progress.stateOperators(0).numRowsTotal === 1)
+
+ assert(progress.sink.description contains "MemorySink")
+ true
+ },
- StartStream(),
- AddData(inputData, 0),
- ExpectFailure[SparkException],
- AssertOnQuery(_.status.inputRate === 0.0),
- AssertOnQuery(_.status.processingRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses.length === 1),
- AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).json),
- AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.status.sinkStatus.offsetDesc ===
- OffsetSeq.fill(LongOffset(1)).toString),
- AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).json),
- AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
- AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
- AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString)
+ AddData(inputData, 1, 2),
+ AdvanceManualClock(100), // allow another trigger
+ CheckAnswer(4),
+ AssertOnQuery(_.status.isDataAvailable === true),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message === "Waiting for next trigger"),
+ AssertOnQuery { query =>
+ assert(query.recentProgresses.last.eq(query.lastProgress))
+ assert(query.lastProgress.batchId === 1)
+ assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
+ true
+ },
+
+ // Test status after data is not available for a trigger
+ AdvanceManualClock(100), // allow another trigger
+ AssertStreamExecThreadToWaitForClock(),
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message === "Waiting for next trigger")
)
}
@@ -196,7 +251,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
/** Whether metrics of a query is registered for reporting */
def isMetricsRegistered(query: StreamingQuery): Boolean = {
- val sourceName = s"StructuredStreaming.${query.name}"
+ val sourceName = s"spark.streaming.${query.name}"
val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName)
require(sources.size <= 1)
sources.nonEmpty
@@ -229,23 +284,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
// Trigger input has 10 rows, static input has 2 rows,
// therefore after the first trigger, the calculated input rows should be 10
- val status = getFirstTriggerStatus(streamingInputDF.join(staticInputDF, "value"))
- assert(status.triggerDetails.get("numRows.input.total") === "10")
- assert(status.sourceStatuses.size === 1)
- assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10")
+ val progress = getFirstProgress(streamingInputDF.join(staticInputDF, "value"))
+ assert(progress.numInputRows === 10)
+ assert(progress.sources.size === 1)
+ assert(progress.sources(0).numInputRows === 10)
}
- test("input row calculation with trigger DF having multiple leaves") {
+ test("input row calculation with trigger input DF having multiple leaves") {
val streamingTriggerDF =
spark.createDataset(1 to 5).toDF.union(spark.createDataset(6 to 10).toDF)
require(streamingTriggerDF.logicalPlan.collectLeaves().size > 1)
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF)
// After the first trigger, the calculated input rows should be 10
- val status = getFirstTriggerStatus(streamingInputDF)
- assert(status.triggerDetails.get("numRows.input.total") === "10")
- assert(status.sourceStatuses.size === 1)
- assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10")
+ val progress = getFirstProgress(streamingInputDF)
+ assert(progress.numInputRows === 10)
+ assert(progress.sources.size === 1)
+ assert(progress.sources(0).numInputRows === 10)
}
testQuietly("StreamExecution metadata garbage collection") {
@@ -285,34 +340,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
StreamingExecutionRelation(source)
}
- /** Returns the query status at the end of the first trigger of streaming DF */
- private def getFirstTriggerStatus(streamingDF: DataFrame): StreamingQueryStatus = {
- // A StreamingQueryListener that gets the query status after the first completed trigger
- val listener = new StreamingQueryListener {
- @volatile var firstStatus: StreamingQueryStatus = null
- @volatile var queryStartedEvent = 0
- override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
- queryStartedEvent += 1
- }
- override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
- if (firstStatus == null) firstStatus = queryProgress.queryStatus
- }
- override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
- }
-
+ /** Returns the query progress at the end of the first trigger of streaming DF */
+ private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress = {
try {
- spark.streams.addListener(listener)
val q = streamingDF.writeStream.format("memory").queryName("test").start()
q.processAllAvailable()
- eventually(timeout(streamingTimeout)) {
- assert(listener.firstStatus != null)
- // test if QueryStartedEvent callback is called for only once
- assert(listener.queryStartedEvent === 1)
- }
- listener.firstStatus
+ q.recentProgresses.head
} finally {
spark.streams.active.map(_.stop())
- spark.streams.removeListener(listener)
}
}
@@ -369,3 +404,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
}
}
}
+
+object StreamingQuerySuite {
+ // Singleton reference to clock that does not get serialized in task closures
+ var clock: ManualClock = null
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
index 3e9488c7dc..12f3c3e5ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -51,6 +51,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
test("watermark metric") {
+
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
@@ -62,16 +63,19 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
testStream(windowedAggregation)(
AddData(inputData, 15),
- AssertOnLastQueryStatus { status =>
- status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+ CheckAnswer(),
+ AssertOnQuery { query =>
+ query.lastProgress.currentWatermark === 5000
},
AddData(inputData, 15),
- AssertOnLastQueryStatus { status =>
- status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+ CheckAnswer(),
+ AssertOnQuery { query =>
+ query.lastProgress.currentWatermark === 5000
},
AddData(inputData, 25),
- AssertOnLastQueryStatus { status =>
- status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "15000"
+ CheckAnswer(),
+ AssertOnQuery { query =>
+ query.lastProgress.currentWatermark === 15000
}
)
}