aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/dstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/dstream.py')
-rw-r--r--python/pyspark/streaming/dstream.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 8dcb9645cd..698336cfce 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -610,7 +610,10 @@ class TransformedDStream(DStream):
self.is_checkpointed = False
self._jdstream_val = None
- if (isinstance(prev, TransformedDStream) and
+ # Using type() to avoid folding the functions and compacting the DStreams which is not
+ # not strictly a object of TransformedDStream.
+ # Changed here is to avoid bug in KafkaTransformedDStream when calling offsetRanges().
+ if (type(prev) is TransformedDStream and
not prev.is_cached and not prev.is_checkpointed):
prev_func = prev.func
self.func = lambda t, rdd: func(t, prev_func(t, rdd))