From d89cc38b33815e7b99fb3389b5038a543527065d Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 21 Aug 2015 13:10:11 -0700 Subject: [SPARK-10122] [PYSPARK] [STREAMING] Fix getOffsetRanges bug in PySpark-Streaming transform function Details of the bug and explanations can be seen in [SPARK-10122](https://issues.apache.org/jira/browse/SPARK-10122). tdas , please help to review. Author: jerryshao Closes #8347 from jerryshao/SPARK-10122 and squashes the following commits: 4039b16 [jerryshao] Fix getOffsetRanges in transform() bug --- python/pyspark/streaming/dstream.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'python/pyspark/streaming/dstream.py') diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 8dcb9645cd..698336cfce 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -610,7 +610,10 @@ class TransformedDStream(DStream): self.is_checkpointed = False self._jdstream_val = None - if (isinstance(prev, TransformedDStream) and + # Using type() to avoid folding the functions and compacting the DStreams which is not + # not strictly a object of TransformedDStream. + # Changed here is to avoid bug in KafkaTransformedDStream when calling offsetRanges(). + if (type(prev) is TransformedDStream and not prev.is_cached and not prev.is_checkpointed): prev_func = prev.func self.func = lambda t, rdd: func(t, prev_func(t, rdd)) -- cgit v1.2.3