aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-10-13 13:36:26 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-10-13 13:36:26 -0700
commit7106866c220c73960c6fe2a70e4911516617e21f (patch)
treeb468a9cd08d1f3ea8ef939f4b1d8e6b33cf76693 /python
parent08eac356095c7faa2b19d52f2fb0cbc47eb7d1d1 (diff)
downloadspark-7106866c220c73960c6fe2a70e4911516617e21f.tar.gz
spark-7106866c220c73960c6fe2a70e4911516617e21f.tar.bz2
spark-7106866c220c73960c6fe2a70e4911516617e21f.zip
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming
## What changes were proposed in this pull request? Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. ### New APIs - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` ### Breaking changes to existing APIs **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. ## How was this patch tested? Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15307 from tdas/SPARK-17731.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/streaming.py301
1 files changed, 301 insertions, 0 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 4e438fd5be..ce47bd1640 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -189,6 +189,304 @@ class StreamingQueryManager(object):
self._jsqm.resetTerminated()
+class StreamingQueryStatus(object):
+ """A class used to report information about the progress of a StreamingQuery.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jsqs):
+ self._jsqs = jsqs
+
+ def __str__(self):
+ """
+ Pretty string of this query status.
+
+ >>> print(sqs)
+ StreamingQueryStatus:
+ Query name: query
+ Query id: 1
+ Status timestamp: 123
+ Input rate: 15.5 rows/sec
+ Processing rate 23.5 rows/sec
+ Latency: 345.0 ms
+ Trigger details:
+ isDataPresentInTrigger: true
+ isTriggerActive: true
+ latency.getBatch.total: 20
+ latency.getOffset.total: 10
+ numRows.input.total: 100
+ triggerId: 5
+ Source statuses [1 source]:
+ Source 1: MySource1
+ Available offset: #0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ Sink status: MySink
+ Committed offsets: [#1, -]
+ """
+ return self._jsqs.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def name(self):
+ """
+ Name of the query. This name is unique across all active queries.
+
+ >>> sqs.name
+ u'query'
+ """
+ return self._jsqs.name()
+
+ @property
+ @since(2.1)
+ def id(self):
+ """
+ Id of the query. This id is unique across all queries that have been started in
+ the current process.
+
+ >>> int(sqs.id)
+ 1
+ """
+ return self._jsqs.id()
+
+ @property
+ @since(2.1)
+ def timestamp(self):
+ """
+ Timestamp (ms) of when this query was generated.
+
+ >>> int(sqs.timestamp)
+ 123
+ """
+ return self._jsqs.timestamp()
+
+ @property
+ @since(2.1)
+ def inputRate(self):
+ """
+ Current total rate (rows/sec) at which data is being generated by all the sources.
+
+ >>> sqs.inputRate
+ 15.5
+ """
+ return self._jsqs.inputRate()
+
+ @property
+ @since(2.1)
+ def processingRate(self):
+ """
+ Current rate (rows/sec) at which the query is processing data from all the sources.
+
+ >>> sqs.processingRate
+ 23.5
+ """
+ return self._jsqs.processingRate()
+
+ @property
+ @since(2.1)
+ def latency(self):
+ """
+ Current average latency between the data being available in source and the sink
+ writing the corresponding output.
+
+ >>> sqs.latency
+ 345.0
+ """
+ if (self._jsqs.latency().nonEmpty()):
+ return self._jsqs.latency().get()
+ else:
+ return None
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def sourceStatuses(self):
+ """
+ Current statuses of the sources as a list.
+
+ >>> len(sqs.sourceStatuses)
+ 1
+ >>> sqs.sourceStatuses[0].description
+ u'MySource1'
+ """
+ return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def sinkStatus(self):
+ """
+ Current status of the sink.
+
+ >>> sqs.sinkStatus.description
+ u'MySink'
+ """
+ return SinkStatus(self._jsqs.sinkStatus())
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def triggerDetails(self):
+ """
+ Low-level details of the currently active trigger (e.g. number of rows processed
+ in trigger, latency of intermediate steps, etc.).
+
+ If no trigger is currently active, then it will have details of the last completed trigger.
+
+ >>> sqs.triggerDetails
+ {u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
+ u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
+ u'isDataPresentInTrigger': u'true'}
+ """
+ return self._jsqs.triggerDetails()
+
+
+class SourceStatus(object):
+ """
+ Status and metrics of a streaming Source.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jss):
+ self._jss = jss
+
+ def __str__(self):
+ """
+ Pretty string of this source status.
+
+ >>> print(sqs.sourceStatuses[0])
+ SourceStatus: MySource1
+ Available offset: #0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ """
+ return self._jss.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def description(self):
+ """
+ Description of the source corresponding to this status.
+
+ >>> sqs.sourceStatuses[0].description
+ u'MySource1'
+ """
+ return self._jss.description()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def offsetDesc(self):
+ """
+ Description of the current offset if known.
+
+ >>> sqs.sourceStatuses[0].offsetDesc
+ u'#0'
+ """
+ return self._jss.offsetDesc()
+
+ @property
+ @since(2.1)
+ def inputRate(self):
+ """
+ Current rate (rows/sec) at which data is being generated by the source.
+
+ >>> sqs.sourceStatuses[0].inputRate
+ 15.5
+ """
+ return self._jss.inputRate()
+
+ @property
+ @since(2.1)
+ def processingRate(self):
+ """
+ Current rate (rows/sec) at which the query is processing data from the source.
+
+ >>> sqs.sourceStatuses[0].processingRate
+ 23.5
+ """
+ return self._jss.processingRate()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def triggerDetails(self):
+ """
+ Low-level details of the currently active trigger (e.g. number of rows processed
+ in trigger, latency of intermediate steps, etc.).
+
+ If no trigger is currently active, then it will have details of the last completed trigger.
+
+ >>> sqs.sourceStatuses[0].triggerDetails
+ {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
+ u'latency.getBatch.source': u'20'}
+ """
+ return self._jss.triggerDetails()
+
+
+class SinkStatus(object):
+ """
+ Status and metrics of a streaming Sink.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jss):
+ self._jss = jss
+
+ def __str__(self):
+ """
+ Pretty string of this source status.
+
+ >>> print(sqs.sinkStatus)
+ SinkStatus: MySink
+ Committed offsets: [#1, -]
+ """
+ return self._jss.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def description(self):
+ """
+ Description of the source corresponding to this status.
+
+ >>> sqs.sinkStatus.description
+ u'MySink'
+ """
+ return self._jss.description()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def offsetDesc(self):
+ """
+ Description of the current offsets up to which data has been written by the sink.
+
+ >>> sqs.sinkStatus.offsetDesc
+ u'[#1, -]'
+ """
+ return self._jss.offsetDesc()
+
+
class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.
@@ -753,11 +1051,14 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
+ globs['sqs'] = StreamingQueryStatus(
+ spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())
(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
globs['spark'].stop()
+
if failure_count:
exit(-1)