From 8d4940092141c0909b673607f393cdd53f093ed6 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 28 Dec 2015 10:43:23 +0000 Subject: [SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in Python API The semantics of Python countByValue is different from Scala API, it is more like countDistinctValue, so here change to make it consistent with Scala/Java API. Author: jerryshao Closes #10350 from jerryshao/SPARK-12353. --- python/pyspark/streaming/dstream.py | 4 ++-- python/pyspark/streaming/tests.py | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index adc2651740..86447f5e58 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -247,7 +247,7 @@ class DStream(object): Return a new DStream in which each RDD contains the counts of each distinct value in each RDD of this DStream. """ - return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: None).count() + return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y) def saveAsTextFiles(self, prefix, suffix=None): """ @@ -493,7 +493,7 @@ class DStream(object): keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, windowDuration, slideDuration, numPartitions) - return counted.filter(lambda kv: kv[1] > 0).count() + return counted.filter(lambda kv: kv[1] > 0) def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): """ diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 4949cd68e3..86b05d9fd2 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -279,8 +279,10 @@ class BasicOperationTests(PySparkStreamingTestCase): def func(dstream): return dstream.countByValue() - expected = [[4], [4], [3]] - self._test_func(input, func, expected) + expected = [[(1, 2), (2, 2), (3, 2), (4, 2)], + [(5, 2), (6, 2), (7, 1), (8, 1)], + [("a", 2), ("b", 1), ("", 1)]] + self._test_func(input, func, expected, sort=True) def test_groupByKey(self): """Basic operation test for DStream.groupByKey.""" @@ -651,7 +653,16 @@ class WindowFunctionTests(PySparkStreamingTestCase): def func(dstream): return dstream.countByValueAndWindow(2.5, .5) - expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]] + expected = [[(0, 1)], + [(0, 2), (1, 1)], + [(0, 3), (1, 2), (2, 1)], + [(0, 4), (1, 3), (2, 2), (3, 1)], + [(0, 5), (1, 4), (2, 3), (3, 2), (4, 1)], + [(0, 5), (1, 5), (2, 4), (3, 3), (4, 2), (5, 1)], + [(0, 4), (1, 4), (2, 4), (3, 3), (4, 2), (5, 1)], + [(0, 3), (1, 3), (2, 3), (3, 3), (4, 2), (5, 1)], + [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)], + [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]] self._test_func(input, func, expected) def test_group_by_key_and_window(self): -- cgit v1.2.3