aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorDavid Tolpin <david.tolpin@gmail.com>2015-11-19 13:57:23 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-19 13:57:23 -0800
commit599a8c6e2bf7da70b20ef3046f5ce099dfd637f8 (patch)
tree620b913655b307547d9aee37c1f7ec6d68a3d0ef /python/pyspark/streaming/tests.py
parent4700074530d9a398843e13f0ef514be97a237cea (diff)
downloadspark-599a8c6e2bf7da70b20ef3046f5ce099dfd637f8.tar.gz
spark-599a8c6e2bf7da70b20ef3046f5ce099dfd637f8.tar.bz2
spark-599a8c6e2bf7da70b20ef3046f5ce099dfd637f8.zip
[SPARK-11812][PYSPARK] invFunc=None works properly with python's reduceByKeyAndWindow
invFunc is optional and can be None. Instead of invFunc (the parameter) invReduceFunc (a local function) was checked for trueness (that is, not None, in this context). A local function is never None, thus the case of invFunc=None (a common one when inverse reduction is not defined) was treated incorrectly, resulting in loss of data. In addition, the docstring used wrong parameter names, also fixed. Author: David Tolpin <david.tolpin@gmail.com> Closes #9775 from dtolpin/master.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py11
1 files changed, 11 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 0bcd1f1553..3403f6d20d 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -582,6 +582,17 @@ class WindowFunctionTests(PySparkStreamingTestCase):
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))
+ def test_reduce_by_key_and_window_with_none_invFunc(self):
+ input = [range(1), range(2), range(3), range(4), range(5), range(6)]
+
+ def func(dstream):
+ return dstream.map(lambda x: (x, 1))\
+ .reduceByKeyAndWindow(operator.add, None, 5, 1)\
+ .filter(lambda kv: kv[1] > 0).count()
+
+ expected = [[2], [4], [6], [6], [6], [6]]
+ self._test_func(input, func, expected)
+
class StreamingContextTests(PySparkStreamingTestCase):