aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2016-11-09 15:03:22 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-09 15:03:22 -0800
commit3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3 (patch)
tree626afd38724496d5630f54d17acef35424c0746a /python/pyspark/sql/streaming.py
parent64fbdf1aa90b66269daec29f62dc9431c1173bab (diff)
downloadspark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.gz
spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.bz2
spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.zip
[SPARK-17829][SQL] Stable format for offset log
## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. zsxwing marmbrus Author: Tyson Condie <tcondie@gmail.com> Author: Tyson Condie <tcondie@clash.local> Closes #15626 from tcondie/spark-8360.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py12
1 files changed, 6 insertions, 6 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 1c94413e3c..f326f16232 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -220,7 +220,7 @@ class StreamingQueryStatus(object):
triggerId: 5
Source statuses [1 source]:
Source 1 - MySource1
- Available offset: #0
+ Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
@@ -228,7 +228,7 @@ class StreamingQueryStatus(object):
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
- Committed offsets: [#1, -]
+ Committed offsets: [1, -]
"""
return self._jsqs.toString()
@@ -366,7 +366,7 @@ class SourceStatus(object):
>>> print(sqs.sourceStatuses[0])
Status of source MySource1
- Available offset: #0
+ Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
@@ -396,7 +396,7 @@ class SourceStatus(object):
Description of the current offset if known.
>>> sqs.sourceStatuses[0].offsetDesc
- u'#0'
+ u'0'
"""
return self._jss.offsetDesc()
@@ -457,7 +457,7 @@ class SinkStatus(object):
>>> print(sqs.sinkStatus)
Status of sink MySink
- Committed offsets: [#1, -]
+ Committed offsets: [1, -]
"""
return self._jss.toString()
@@ -481,7 +481,7 @@ class SinkStatus(object):
Description of the current offsets up to which data has been written by the sink.
>>> sqs.sinkStatus.offsetDesc
- u'[#1, -]'
+ u'[1, -]'
"""
return self._jss.offsetDesc()