diff options
author | Tyson Condie <tcondie@gmail.com> | 2016-11-09 15:03:22 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-09 15:03:22 -0800 |
commit | 3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3 (patch) | |
tree | 626afd38724496d5630f54d17acef35424c0746a /python/pyspark/sql/streaming.py | |
parent | 64fbdf1aa90b66269daec29f62dc9431c1173bab (diff) | |
download | spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.gz spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.bz2 spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.zip |
[SPARK-17829][SQL] Stable format for offset log
## What changes were proposed in this pull request?
Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues:
It can break across spark releases (though this is not the only thing preventing us from upgrading a running query)
It is unnecessarily opaque to the user.
I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option.
## How was this patch tested?
Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala)
Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.
zsxwing marmbrus
Author: Tyson Condie <tcondie@gmail.com>
Author: Tyson Condie <tcondie@clash.local>
Closes #15626 from tcondie/spark-8360.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r-- | python/pyspark/sql/streaming.py | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 1c94413e3c..f326f16232 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -220,7 +220,7 @@ class StreamingQueryStatus(object): triggerId: 5 Source statuses [1 source]: Source 1 - MySource1 - Available offset: #0 + Available offset: 0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec Trigger details: @@ -228,7 +228,7 @@ class StreamingQueryStatus(object): latency.getOffset.source: 10 latency.getBatch.source: 20 Sink status - MySink - Committed offsets: [#1, -] + Committed offsets: [1, -] """ return self._jsqs.toString() @@ -366,7 +366,7 @@ class SourceStatus(object): >>> print(sqs.sourceStatuses[0]) Status of source MySource1 - Available offset: #0 + Available offset: 0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec Trigger details: @@ -396,7 +396,7 @@ class SourceStatus(object): Description of the current offset if known. >>> sqs.sourceStatuses[0].offsetDesc - u'#0' + u'0' """ return self._jss.offsetDesc() @@ -457,7 +457,7 @@ class SinkStatus(object): >>> print(sqs.sinkStatus) Status of sink MySink - Committed offsets: [#1, -] + Committed offsets: [1, -] """ return self._jss.toString() @@ -481,7 +481,7 @@ class SinkStatus(object): Description of the current offsets up to which data has been written by the sink. >>> sqs.sinkStatus.offsetDesc - u'[#1, -]' + u'[1, -]' """ return self._jss.offsetDesc() |