diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-11-29 17:24:17 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-11-29 17:24:17 -0800 |
commit | c3d08e2f29baeebe09bf4c059ace4336af9116b5 (patch) | |
tree | bc677be4760fdd9dfabcb5bb990c576fd5c65e12 /python/pyspark/sql/streaming.py | |
parent | 9a02f6821265ff67ba3f7b095cd1afaebd25a898 (diff) | |
download | spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.gz spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.bz2 spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.zip |
[SPARK-18516][SQL] Split state and progress in streaming
This PR separates the status of a `StreamingQuery` into two separate APIs:
- `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
- `recentProgress` - an array of statistics about the most recent microbatches that have executed.
A recent progress contains the following information:
```
{
"id" : "2be8670a-fce1-4859-a530-748f29553bb6",
"name" : "query-29",
"timestamp" : 1479705392724,
"inputRowsPerSecond" : 230.76923076923077,
"processedRowsPerSecond" : 10.869565217391303,
"durationMs" : {
"triggerExecution" : 276,
"queryPlanning" : 3,
"getBatch" : 5,
"getOffset" : 3,
"addBatch" : 234,
"walCommit" : 30
},
"currentWatermark" : 0,
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-14]]",
"startOffset" : {
"topic-14" : {
"2" : 0,
"4" : 1,
"1" : 0,
"3" : 0,
"0" : 0
}
},
"endOffset" : {
"topic-14" : {
"2" : 1,
"4" : 2,
"1" : 0,
"3" : 0,
"0" : 1
}
},
"numRecords" : 3,
"inputRowsPerSecond" : 230.76923076923077,
"processedRowsPerSecond" : 10.869565217391303
} ]
}
```
Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Michael Armbrust <michael@databricks.com>
Closes #15954 from marmbrus/queryProgress.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r-- | python/pyspark/sql/streaming.py | 326 |
1 files changed, 22 insertions, 304 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 9c3a237699..c420b0d016 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -16,6 +16,8 @@ # import sys +import json + if sys.version >= '3': intlike = int basestring = unicode = str @@ -48,10 +50,9 @@ class StreamingQuery(object): @property @since(2.0) def id(self): - """The id of the streaming query. This id is unique across all queries that have been - started in the current process. + """The id of the streaming query. """ - return self._jsq.id() + return self._jsq.id().toString() @property @since(2.0) @@ -87,6 +88,24 @@ class StreamingQuery(object): else: return self._jsq.awaitTermination() + @property + @since(2.1) + def recentProgresses(self): + """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. + The number of progress updates retained for each stream is configured by Spark session + configuration `spark.sql.streaming.numRecentProgresses`. + """ + return [json.loads(p.json()) for p in self._jsq.recentProgresses()] + + @property + @since(2.1) + def lastProgress(self): + """ + Returns the most recent :class:`StreamingQueryProgress` update of this streaming query. + :return: a map + """ + return json.loads(self._jsq.lastProgress().json()) + @since(2.0) def processAllAvailable(self): """Blocks until all available data in the source has been processed and committed to the @@ -149,8 +168,6 @@ class StreamingQueryManager(object): True >>> sq.stop() """ - if not isinstance(id, intlike): - raise ValueError("The id for the query must be an integer. Got: %s" % id) return StreamingQuery(self._jsqm.get(id)) @since(2.0) @@ -191,303 +208,6 @@ class StreamingQueryManager(object): self._jsqm.resetTerminated() -class StreamingQueryStatus(object): - """A class used to report information about the progress of a StreamingQuery. - - .. note:: Experimental - - .. versionadded:: 2.1 - """ - - def __init__(self, jsqs): - self._jsqs = jsqs - - def __str__(self): - """ - Pretty string of this query status. - - >>> print(sqs) - Status of query 'query' - Query id: 1 - Status timestamp: 123 - Input rate: 15.5 rows/sec - Processing rate 23.5 rows/sec - Latency: 345.0 ms - Trigger details: - batchId: 5 - isDataPresentInTrigger: true - isTriggerActive: true - latency.getBatch.total: 20 - latency.getOffset.total: 10 - numRows.input.total: 100 - Source statuses [1 source]: - Source 1 - MySource1 - Available offset: 0 - Input rate: 15.5 rows/sec - Processing rate: 23.5 rows/sec - Trigger details: - numRows.input.source: 100 - latency.getOffset.source: 10 - latency.getBatch.source: 20 - Sink status - MySink - Committed offsets: [1, -] - """ - return self._jsqs.toString() - - @property - @ignore_unicode_prefix - @since(2.1) - def name(self): - """ - Name of the query. This name is unique across all active queries. - - >>> sqs.name - u'query' - """ - return self._jsqs.name() - - @property - @since(2.1) - def id(self): - """ - Id of the query. This id is unique across all queries that have been started in - the current process. - - >>> int(sqs.id) - 1 - """ - return self._jsqs.id() - - @property - @since(2.1) - def timestamp(self): - """ - Timestamp (ms) of when this query was generated. - - >>> int(sqs.timestamp) - 123 - """ - return self._jsqs.timestamp() - - @property - @since(2.1) - def inputRate(self): - """ - Current total rate (rows/sec) at which data is being generated by all the sources. - - >>> sqs.inputRate - 15.5 - """ - return self._jsqs.inputRate() - - @property - @since(2.1) - def processingRate(self): - """ - Current rate (rows/sec) at which the query is processing data from all the sources. - - >>> sqs.processingRate - 23.5 - """ - return self._jsqs.processingRate() - - @property - @since(2.1) - def latency(self): - """ - Current average latency between the data being available in source and the sink - writing the corresponding output. - - >>> sqs.latency - 345.0 - """ - if (self._jsqs.latency().nonEmpty()): - return self._jsqs.latency().get() - else: - return None - - @property - @ignore_unicode_prefix - @since(2.1) - def sourceStatuses(self): - """ - Current statuses of the sources as a list. - - >>> len(sqs.sourceStatuses) - 1 - >>> sqs.sourceStatuses[0].description - u'MySource1' - """ - return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()] - - @property - @ignore_unicode_prefix - @since(2.1) - def sinkStatus(self): - """ - Current status of the sink. - - >>> sqs.sinkStatus.description - u'MySink' - """ - return SinkStatus(self._jsqs.sinkStatus()) - - @property - @ignore_unicode_prefix - @since(2.1) - def triggerDetails(self): - """ - Low-level details of the currently active trigger (e.g. number of rows processed - in trigger, latency of intermediate steps, etc.). - - If no trigger is currently active, then it will have details of the last completed trigger. - - >>> sqs.triggerDetails - {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100', - u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10', - u'isDataPresentInTrigger': u'true'} - """ - return self._jsqs.triggerDetails() - - -class SourceStatus(object): - """ - Status and metrics of a streaming Source. - - .. note:: Experimental - - .. versionadded:: 2.1 - """ - - def __init__(self, jss): - self._jss = jss - - def __str__(self): - """ - Pretty string of this source status. - - >>> print(sqs.sourceStatuses[0]) - Status of source MySource1 - Available offset: 0 - Input rate: 15.5 rows/sec - Processing rate: 23.5 rows/sec - Trigger details: - numRows.input.source: 100 - latency.getOffset.source: 10 - latency.getBatch.source: 20 - """ - return self._jss.toString() - - @property - @ignore_unicode_prefix - @since(2.1) - def description(self): - """ - Description of the source corresponding to this status. - - >>> sqs.sourceStatuses[0].description - u'MySource1' - """ - return self._jss.description() - - @property - @ignore_unicode_prefix - @since(2.1) - def offsetDesc(self): - """ - Description of the current offset if known. - - >>> sqs.sourceStatuses[0].offsetDesc - u'0' - """ - return self._jss.offsetDesc() - - @property - @since(2.1) - def inputRate(self): - """ - Current rate (rows/sec) at which data is being generated by the source. - - >>> sqs.sourceStatuses[0].inputRate - 15.5 - """ - return self._jss.inputRate() - - @property - @since(2.1) - def processingRate(self): - """ - Current rate (rows/sec) at which the query is processing data from the source. - - >>> sqs.sourceStatuses[0].processingRate - 23.5 - """ - return self._jss.processingRate() - - @property - @ignore_unicode_prefix - @since(2.1) - def triggerDetails(self): - """ - Low-level details of the currently active trigger (e.g. number of rows processed - in trigger, latency of intermediate steps, etc.). - - If no trigger is currently active, then it will have details of the last completed trigger. - - >>> sqs.sourceStatuses[0].triggerDetails - {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10', - u'latency.getBatch.source': u'20'} - """ - return self._jss.triggerDetails() - - -class SinkStatus(object): - """ - Status and metrics of a streaming Sink. - - .. note:: Experimental - - .. versionadded:: 2.1 - """ - - def __init__(self, jss): - self._jss = jss - - def __str__(self): - """ - Pretty string of this source status. - - >>> print(sqs.sinkStatus) - Status of sink MySink - Committed offsets: [1, -] - """ - return self._jss.toString() - - @property - @ignore_unicode_prefix - @since(2.1) - def description(self): - """ - Description of the source corresponding to this status. - - >>> sqs.sinkStatus.description - u'MySink' - """ - return self._jss.description() - - @property - @ignore_unicode_prefix - @since(2.1) - def offsetDesc(self): - """ - Description of the current offsets up to which data has been written by the sink. - - >>> sqs.sinkStatus.offsetDesc - u'[1, -]' - """ - return self._jss.offsetDesc() - - class Trigger(object): """Used to indicate how often results should be produced by a :class:`StreamingQuery`. @@ -1053,8 +773,6 @@ def _test(): globs['sdf_schema'] = StructType([StructField("data", StringType(), False)]) globs['df'] = \ globs['spark'].readStream.format('text').load('python/test_support/sql/streaming') - globs['sqs'] = StreamingQueryStatus( - spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus()) (failure_count, test_count) = doctest.testmod( pyspark.sql.streaming, globs=globs, |