aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-11-29 17:24:17 -0800
committerMichael Armbrust <michael@databricks.com>2016-11-29 17:24:17 -0800
commitc3d08e2f29baeebe09bf4c059ace4336af9116b5 (patch)
treebc677be4760fdd9dfabcb5bb990c576fd5c65e12 /python/pyspark/sql/streaming.py
parent9a02f6821265ff67ba3f7b095cd1afaebd25a898 (diff)
downloadspark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.gz
spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.bz2
spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.zip
[SPARK-18516][SQL] Split state and progress in streaming
This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #15954 from marmbrus/queryProgress.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py326
1 files changed, 22 insertions, 304 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 9c3a237699..c420b0d016 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -16,6 +16,8 @@
#
import sys
+import json
+
if sys.version >= '3':
intlike = int
basestring = unicode = str
@@ -48,10 +50,9 @@ class StreamingQuery(object):
@property
@since(2.0)
def id(self):
- """The id of the streaming query. This id is unique across all queries that have been
- started in the current process.
+ """The id of the streaming query.
"""
- return self._jsq.id()
+ return self._jsq.id().toString()
@property
@since(2.0)
@@ -87,6 +88,24 @@ class StreamingQuery(object):
else:
return self._jsq.awaitTermination()
+ @property
+ @since(2.1)
+ def recentProgresses(self):
+ """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
+ The number of progress updates retained for each stream is configured by Spark session
+ configuration `spark.sql.streaming.numRecentProgresses`.
+ """
+ return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
+
+ @property
+ @since(2.1)
+ def lastProgress(self):
+ """
+ Returns the most recent :class:`StreamingQueryProgress` update of this streaming query.
+ :return: a map
+ """
+ return json.loads(self._jsq.lastProgress().json())
+
@since(2.0)
def processAllAvailable(self):
"""Blocks until all available data in the source has been processed and committed to the
@@ -149,8 +168,6 @@ class StreamingQueryManager(object):
True
>>> sq.stop()
"""
- if not isinstance(id, intlike):
- raise ValueError("The id for the query must be an integer. Got: %s" % id)
return StreamingQuery(self._jsqm.get(id))
@since(2.0)
@@ -191,303 +208,6 @@ class StreamingQueryManager(object):
self._jsqm.resetTerminated()
-class StreamingQueryStatus(object):
- """A class used to report information about the progress of a StreamingQuery.
-
- .. note:: Experimental
-
- .. versionadded:: 2.1
- """
-
- def __init__(self, jsqs):
- self._jsqs = jsqs
-
- def __str__(self):
- """
- Pretty string of this query status.
-
- >>> print(sqs)
- Status of query 'query'
- Query id: 1
- Status timestamp: 123
- Input rate: 15.5 rows/sec
- Processing rate 23.5 rows/sec
- Latency: 345.0 ms
- Trigger details:
- batchId: 5
- isDataPresentInTrigger: true
- isTriggerActive: true
- latency.getBatch.total: 20
- latency.getOffset.total: 10
- numRows.input.total: 100
- Source statuses [1 source]:
- Source 1 - MySource1
- Available offset: 0
- Input rate: 15.5 rows/sec
- Processing rate: 23.5 rows/sec
- Trigger details:
- numRows.input.source: 100
- latency.getOffset.source: 10
- latency.getBatch.source: 20
- Sink status - MySink
- Committed offsets: [1, -]
- """
- return self._jsqs.toString()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def name(self):
- """
- Name of the query. This name is unique across all active queries.
-
- >>> sqs.name
- u'query'
- """
- return self._jsqs.name()
-
- @property
- @since(2.1)
- def id(self):
- """
- Id of the query. This id is unique across all queries that have been started in
- the current process.
-
- >>> int(sqs.id)
- 1
- """
- return self._jsqs.id()
-
- @property
- @since(2.1)
- def timestamp(self):
- """
- Timestamp (ms) of when this query was generated.
-
- >>> int(sqs.timestamp)
- 123
- """
- return self._jsqs.timestamp()
-
- @property
- @since(2.1)
- def inputRate(self):
- """
- Current total rate (rows/sec) at which data is being generated by all the sources.
-
- >>> sqs.inputRate
- 15.5
- """
- return self._jsqs.inputRate()
-
- @property
- @since(2.1)
- def processingRate(self):
- """
- Current rate (rows/sec) at which the query is processing data from all the sources.
-
- >>> sqs.processingRate
- 23.5
- """
- return self._jsqs.processingRate()
-
- @property
- @since(2.1)
- def latency(self):
- """
- Current average latency between the data being available in source and the sink
- writing the corresponding output.
-
- >>> sqs.latency
- 345.0
- """
- if (self._jsqs.latency().nonEmpty()):
- return self._jsqs.latency().get()
- else:
- return None
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def sourceStatuses(self):
- """
- Current statuses of the sources as a list.
-
- >>> len(sqs.sourceStatuses)
- 1
- >>> sqs.sourceStatuses[0].description
- u'MySource1'
- """
- return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def sinkStatus(self):
- """
- Current status of the sink.
-
- >>> sqs.sinkStatus.description
- u'MySink'
- """
- return SinkStatus(self._jsqs.sinkStatus())
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def triggerDetails(self):
- """
- Low-level details of the currently active trigger (e.g. number of rows processed
- in trigger, latency of intermediate steps, etc.).
-
- If no trigger is currently active, then it will have details of the last completed trigger.
-
- >>> sqs.triggerDetails
- {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
- u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
- u'isDataPresentInTrigger': u'true'}
- """
- return self._jsqs.triggerDetails()
-
-
-class SourceStatus(object):
- """
- Status and metrics of a streaming Source.
-
- .. note:: Experimental
-
- .. versionadded:: 2.1
- """
-
- def __init__(self, jss):
- self._jss = jss
-
- def __str__(self):
- """
- Pretty string of this source status.
-
- >>> print(sqs.sourceStatuses[0])
- Status of source MySource1
- Available offset: 0
- Input rate: 15.5 rows/sec
- Processing rate: 23.5 rows/sec
- Trigger details:
- numRows.input.source: 100
- latency.getOffset.source: 10
- latency.getBatch.source: 20
- """
- return self._jss.toString()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def description(self):
- """
- Description of the source corresponding to this status.
-
- >>> sqs.sourceStatuses[0].description
- u'MySource1'
- """
- return self._jss.description()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def offsetDesc(self):
- """
- Description of the current offset if known.
-
- >>> sqs.sourceStatuses[0].offsetDesc
- u'0'
- """
- return self._jss.offsetDesc()
-
- @property
- @since(2.1)
- def inputRate(self):
- """
- Current rate (rows/sec) at which data is being generated by the source.
-
- >>> sqs.sourceStatuses[0].inputRate
- 15.5
- """
- return self._jss.inputRate()
-
- @property
- @since(2.1)
- def processingRate(self):
- """
- Current rate (rows/sec) at which the query is processing data from the source.
-
- >>> sqs.sourceStatuses[0].processingRate
- 23.5
- """
- return self._jss.processingRate()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def triggerDetails(self):
- """
- Low-level details of the currently active trigger (e.g. number of rows processed
- in trigger, latency of intermediate steps, etc.).
-
- If no trigger is currently active, then it will have details of the last completed trigger.
-
- >>> sqs.sourceStatuses[0].triggerDetails
- {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
- u'latency.getBatch.source': u'20'}
- """
- return self._jss.triggerDetails()
-
-
-class SinkStatus(object):
- """
- Status and metrics of a streaming Sink.
-
- .. note:: Experimental
-
- .. versionadded:: 2.1
- """
-
- def __init__(self, jss):
- self._jss = jss
-
- def __str__(self):
- """
- Pretty string of this source status.
-
- >>> print(sqs.sinkStatus)
- Status of sink MySink
- Committed offsets: [1, -]
- """
- return self._jss.toString()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def description(self):
- """
- Description of the source corresponding to this status.
-
- >>> sqs.sinkStatus.description
- u'MySink'
- """
- return self._jss.description()
-
- @property
- @ignore_unicode_prefix
- @since(2.1)
- def offsetDesc(self):
- """
- Description of the current offsets up to which data has been written by the sink.
-
- >>> sqs.sinkStatus.offsetDesc
- u'[1, -]'
- """
- return self._jss.offsetDesc()
-
-
class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.
@@ -1053,8 +773,6 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
- globs['sqs'] = StreamingQueryStatus(
- spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())
(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,