diff options
author | David Tolpin <david.tolpin@gmail.com> | 2015-12-16 22:10:24 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2015-12-16 22:10:24 -0800 |
commit | 437583f692e30b8dc03b339a34e92595d7b992ba (patch) | |
tree | c478b76e814b4b5cb204ea87abc9facd8687c401 | |
parent | 97678edeaaafc19ea18d044233a952d2e2e89fbc (diff) | |
download | spark-437583f692e30b8dc03b339a34e92595d7b992ba.tar.gz spark-437583f692e30b8dc03b339a34e92595d7b992ba.tar.bz2 spark-437583f692e30b8dc03b339a34e92595d7b992ba.zip |
[SPARK-11904][PYSPARK] reduceByKeyAndWindow does not require checkpointing when invFunc is None
when invFunc is None, `reduceByKeyAndWindow(func, None, winsize, slidesize)` is equivalent to
reduceByKey(func).window(winsize, slidesize).reduceByKey(winsize, slidesize)
and no checkpoint is necessary. The corresponding Scala code does exactly that, but Python code always creates a windowed stream with obligatory checkpointing. The patch fixes this.
I do not know how to unit-test this.
Author: David Tolpin <david.tolpin@gmail.com>
Closes #9888 from dtolpin/master.
-rw-r--r-- | python/pyspark/streaming/dstream.py | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index f61137cb88..b994a53bf2 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -542,31 +542,32 @@ class DStream(object): reduced = self.reduceByKey(func, numPartitions) - def reduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - r = a.union(b).reduceByKey(func, numPartitions) if a else b - if filterFunc: - r = r.filter(filterFunc) - return r - - def invReduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - joined = a.leftOuterJoin(b, numPartitions) - return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) - if kv[1] is not None else kv[0]) - - jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) if invFunc: + def reduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + r = a.union(b).reduceByKey(func, numPartitions) if a else b + if filterFunc: + r = r.filter(filterFunc) + return r + + def invReduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + joined = a.leftOuterJoin(b, numPartitions) + return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) + if kv[1] is not None else kv[0]) + + jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) + if slideDuration is None: + slideDuration = self._slideDuration + dstream = self._sc._jvm.PythonReducedWindowedDStream( + reduced._jdstream.dstream(), + jreduceFunc, jinvReduceFunc, + self._ssc._jduration(windowDuration), + self._ssc._jduration(slideDuration)) + return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) else: - jinvReduceFunc = None - if slideDuration is None: - slideDuration = self._slideDuration - dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), - jreduceFunc, jinvReduceFunc, - self._ssc._jduration(windowDuration), - self._ssc._jduration(slideDuration)) - return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) + return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions) def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None): """ |