aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-08-21 13:10:11 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-21 13:15:35 -0700
commitd89cc38b33815e7b99fb3389b5038a543527065d (patch)
treeed2d5d2395d23e5dfc6c8a0759b25287dad50007 /python
parent3c462f5d87a9654c5a68fd658a40f5062029fd9a (diff)
downloadspark-d89cc38b33815e7b99fb3389b5038a543527065d.tar.gz
spark-d89cc38b33815e7b99fb3389b5038a543527065d.tar.bz2
spark-d89cc38b33815e7b99fb3389b5038a543527065d.zip
[SPARK-10122] [PYSPARK] [STREAMING] Fix getOffsetRanges bug in PySpark-Streaming transform function
Details of the bug and explanations can be seen in [SPARK-10122](https://issues.apache.org/jira/browse/SPARK-10122). tdas , please help to review. Author: jerryshao <sshao@hortonworks.com> Closes #8347 from jerryshao/SPARK-10122 and squashes the following commits: 4039b16 [jerryshao] Fix getOffsetRanges in transform() bug
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/dstream.py5
-rw-r--r--python/pyspark/streaming/tests.py4
2 files changed, 7 insertions, 2 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 8dcb9645cd..698336cfce 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -610,7 +610,10 @@ class TransformedDStream(DStream):
self.is_checkpointed = False
self._jdstream_val = None
- if (isinstance(prev, TransformedDStream) and
+ # Using type() to avoid folding the functions and compacting the DStreams which is not
+ # not strictly a object of TransformedDStream.
+ # Changed here is to avoid bug in KafkaTransformedDStream when calling offsetRanges().
+ if (type(prev) is TransformedDStream and
not prev.is_cached and not prev.is_checkpointed):
prev_func = prev.func
self.func = lambda t, rdd: func(t, prev_func(t, rdd))
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 6108c845c1..214d5be439 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -850,7 +850,9 @@ class KafkaStreamTests(PySparkStreamingTestCase):
offsetRanges.append(o)
return rdd
- stream.transform(transformWithOffsetRanges).foreachRDD(lambda rdd: rdd.count())
+ # Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together,
+ # only the TransformedDstreams can be folded together.
+ stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint()
self.ssc.start()
self.wait_for(offsetRanges, 1)