aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py301
1 files changed, 301 insertions, 0 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 4e438fd5be..ce47bd1640 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -189,6 +189,304 @@ class StreamingQueryManager(object):
self._jsqm.resetTerminated()
+class StreamingQueryStatus(object):
+ """A class used to report information about the progress of a StreamingQuery.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jsqs):
+ self._jsqs = jsqs
+
+ def __str__(self):
+ """
+ Pretty string of this query status.
+
+ >>> print(sqs)
+ StreamingQueryStatus:
+ Query name: query
+ Query id: 1
+ Status timestamp: 123
+ Input rate: 15.5 rows/sec
+ Processing rate 23.5 rows/sec
+ Latency: 345.0 ms
+ Trigger details:
+ isDataPresentInTrigger: true
+ isTriggerActive: true
+ latency.getBatch.total: 20
+ latency.getOffset.total: 10
+ numRows.input.total: 100
+ triggerId: 5
+ Source statuses [1 source]:
+ Source 1: MySource1
+ Available offset: #0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ Sink status: MySink
+ Committed offsets: [#1, -]
+ """
+ return self._jsqs.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def name(self):
+ """
+ Name of the query. This name is unique across all active queries.
+
+ >>> sqs.name
+ u'query'
+ """
+ return self._jsqs.name()
+
+ @property
+ @since(2.1)
+ def id(self):
+ """
+ Id of the query. This id is unique across all queries that have been started in
+ the current process.
+
+ >>> int(sqs.id)
+ 1
+ """
+ return self._jsqs.id()
+
+ @property
+ @since(2.1)
+ def timestamp(self):
+ """
+ Timestamp (ms) of when this query was generated.
+
+ >>> int(sqs.timestamp)
+ 123
+ """
+ return self._jsqs.timestamp()
+
+ @property
+ @since(2.1)
+ def inputRate(self):
+ """
+ Current total rate (rows/sec) at which data is being generated by all the sources.
+
+ >>> sqs.inputRate
+ 15.5
+ """
+ return self._jsqs.inputRate()
+
+ @property
+ @since(2.1)
+ def processingRate(self):
+ """
+ Current rate (rows/sec) at which the query is processing data from all the sources.
+
+ >>> sqs.processingRate
+ 23.5
+ """
+ return self._jsqs.processingRate()
+
+ @property
+ @since(2.1)
+ def latency(self):
+ """
+ Current average latency between the data being available in source and the sink
+ writing the corresponding output.
+
+ >>> sqs.latency
+ 345.0
+ """
+ if (self._jsqs.latency().nonEmpty()):
+ return self._jsqs.latency().get()
+ else:
+ return None
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def sourceStatuses(self):
+ """
+ Current statuses of the sources as a list.
+
+ >>> len(sqs.sourceStatuses)
+ 1
+ >>> sqs.sourceStatuses[0].description
+ u'MySource1'
+ """
+ return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def sinkStatus(self):
+ """
+ Current status of the sink.
+
+ >>> sqs.sinkStatus.description
+ u'MySink'
+ """
+ return SinkStatus(self._jsqs.sinkStatus())
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def triggerDetails(self):
+ """
+ Low-level details of the currently active trigger (e.g. number of rows processed
+ in trigger, latency of intermediate steps, etc.).
+
+ If no trigger is currently active, then it will have details of the last completed trigger.
+
+ >>> sqs.triggerDetails
+ {u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
+ u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
+ u'isDataPresentInTrigger': u'true'}
+ """
+ return self._jsqs.triggerDetails()
+
+
+class SourceStatus(object):
+ """
+ Status and metrics of a streaming Source.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jss):
+ self._jss = jss
+
+ def __str__(self):
+ """
+ Pretty string of this source status.
+
+ >>> print(sqs.sourceStatuses[0])
+ SourceStatus: MySource1
+ Available offset: #0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ """
+ return self._jss.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def description(self):
+ """
+ Description of the source corresponding to this status.
+
+ >>> sqs.sourceStatuses[0].description
+ u'MySource1'
+ """
+ return self._jss.description()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def offsetDesc(self):
+ """
+ Description of the current offset if known.
+
+ >>> sqs.sourceStatuses[0].offsetDesc
+ u'#0'
+ """
+ return self._jss.offsetDesc()
+
+ @property
+ @since(2.1)
+ def inputRate(self):
+ """
+ Current rate (rows/sec) at which data is being generated by the source.
+
+ >>> sqs.sourceStatuses[0].inputRate
+ 15.5
+ """
+ return self._jss.inputRate()
+
+ @property
+ @since(2.1)
+ def processingRate(self):
+ """
+ Current rate (rows/sec) at which the query is processing data from the source.
+
+ >>> sqs.sourceStatuses[0].processingRate
+ 23.5
+ """
+ return self._jss.processingRate()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def triggerDetails(self):
+ """
+ Low-level details of the currently active trigger (e.g. number of rows processed
+ in trigger, latency of intermediate steps, etc.).
+
+ If no trigger is currently active, then it will have details of the last completed trigger.
+
+ >>> sqs.sourceStatuses[0].triggerDetails
+ {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
+ u'latency.getBatch.source': u'20'}
+ """
+ return self._jss.triggerDetails()
+
+
+class SinkStatus(object):
+ """
+ Status and metrics of a streaming Sink.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jss):
+ self._jss = jss
+
+ def __str__(self):
+ """
+ Pretty string of this source status.
+
+ >>> print(sqs.sinkStatus)
+ SinkStatus: MySink
+ Committed offsets: [#1, -]
+ """
+ return self._jss.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def description(self):
+ """
+ Description of the source corresponding to this status.
+
+ >>> sqs.sinkStatus.description
+ u'MySink'
+ """
+ return self._jss.description()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def offsetDesc(self):
+ """
+ Description of the current offsets up to which data has been written by the sink.
+
+ >>> sqs.sinkStatus.offsetDesc
+ u'[#1, -]'
+ """
+ return self._jss.offsetDesc()
+
+
class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.
@@ -753,11 +1051,14 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
+ globs['sqs'] = StreamingQueryStatus(
+ spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())
(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
globs['spark'].stop()
+
if failure_count:
exit(-1)