From 3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 9 Nov 2016 15:03:22 -0800 Subject: [SPARK-17829][SQL] Stable format for offset log ## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. zsxwing marmbrus Author: Tyson Condie Author: Tyson Condie Closes #15626 from tcondie/spark-8360. --- python/pyspark/sql/streaming.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'python/pyspark/sql/streaming.py') diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 1c94413e3c..f326f16232 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -220,7 +220,7 @@ class StreamingQueryStatus(object): triggerId: 5 Source statuses [1 source]: Source 1 - MySource1 - Available offset: #0 + Available offset: 0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec Trigger details: @@ -228,7 +228,7 @@ class StreamingQueryStatus(object): latency.getOffset.source: 10 latency.getBatch.source: 20 Sink status - MySink - Committed offsets: [#1, -] + Committed offsets: [1, -] """ return self._jsqs.toString() @@ -366,7 +366,7 @@ class SourceStatus(object): >>> print(sqs.sourceStatuses[0]) Status of source MySource1 - Available offset: #0 + Available offset: 0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec Trigger details: @@ -396,7 +396,7 @@ class SourceStatus(object): Description of the current offset if known. >>> sqs.sourceStatuses[0].offsetDesc - u'#0' + u'0' """ return self._jss.offsetDesc() @@ -457,7 +457,7 @@ class SinkStatus(object): >>> print(sqs.sinkStatus) Status of sink MySink - Committed offsets: [#1, -] + Committed offsets: [1, -] """ return self._jss.toString() @@ -481,7 +481,7 @@ class SinkStatus(object): Description of the current offsets up to which data has been written by the sink. >>> sqs.sinkStatus.offsetDesc - u'[#1, -]' + u'[1, -]' """ return self._jss.offsetDesc() -- cgit v1.2.3