aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-12-05 18:17:38 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-05 18:17:38 -0800
commitbb57bfe97d9fb077885065b8e804b85d4c493faf (patch)
tree384926cb923ea82b7195197da27834e4d47548f1 /python/pyspark/sql/streaming.py
parent1b2785c3d0a40da2fca923af78066060dbfbcf0a (diff)
downloadspark-bb57bfe97d9fb077885065b8e804b85d4c493faf.tar.gz
spark-bb57bfe97d9fb077885065b8e804b85d4c493faf.tar.bz2
spark-bb57bfe97d9fb077885065b8e804b85d4c493faf.zip
[SPARK-18657][SPARK-18668] Make StreamingQuery.id persists across restart and not auto-generate StreamingQuery.name
## What changes were proposed in this pull request? Here are the major changes in this PR. - Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`. - Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`). - Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default. - Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`. Implementation details - Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`). - Added the `id` as the new `StreamMetadata`. - When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`. - All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name` TODO - [x] Test handling of name=null in json generation of StreamingQueryProgress - [x] Test handling of name=null in json generation of StreamingQueryListener events - [x] Test python API of runId ## How was this patch tested? Updated unit tests and new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16113 from tdas/SPARK-18657.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py19
1 files changed, 17 insertions, 2 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 4a7d17ba51..ee7a26d00d 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -51,14 +51,29 @@ class StreamingQuery(object):
@property
@since(2.0)
def id(self):
- """The id of the streaming query.
+ """Returns the unique id of this query that persists across restarts from checkpoint data.
+ That is, this id is generated when a query is started for the first time, and
+ will be the same every time it is restarted from checkpoint data.
+ There can only be one query with the same id active in a Spark cluster.
+ Also see, `runId`.
"""
return self._jsq.id().toString()
@property
+ @since(2.1)
+ def runId(self):
+ """Returns the unique id of this query that does not persist across restarts. That is, every
+ query that is started (or restarted from checkpoint) will have a different runId.
+ """
+ return self._jsq.runId().toString()
+
+ @property
@since(2.0)
def name(self):
- """The name of the streaming query. This name is unique across all active queries.
+ """Returns the user-specified name of the query, or null if not specified.
+ This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
+ as `dataframe.writeStream.queryName("query").start()`.
+ This name, if set, must be unique across all active queries.
"""
return self._jsq.name()