aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/dstream.py4
-rw-r--r--python/pyspark/streaming/tests.py17
2 files changed, 16 insertions, 5 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index adc2651740..86447f5e58 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -247,7 +247,7 @@ class DStream(object):
Return a new DStream in which each RDD contains the counts of each
distinct value in each RDD of this DStream.
"""
- return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: None).count()
+ return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
def saveAsTextFiles(self, prefix, suffix=None):
"""
@@ -493,7 +493,7 @@ class DStream(object):
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
- return counted.filter(lambda kv: kv[1] > 0).count()
+ return counted.filter(lambda kv: kv[1] > 0)
def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 4949cd68e3..86b05d9fd2 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -279,8 +279,10 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.countByValue()
- expected = [[4], [4], [3]]
- self._test_func(input, func, expected)
+ expected = [[(1, 2), (2, 2), (3, 2), (4, 2)],
+ [(5, 2), (6, 2), (7, 1), (8, 1)],
+ [("a", 2), ("b", 1), ("", 1)]]
+ self._test_func(input, func, expected, sort=True)
def test_groupByKey(self):
"""Basic operation test for DStream.groupByKey."""
@@ -651,7 +653,16 @@ class WindowFunctionTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.countByValueAndWindow(2.5, .5)
- expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
+ expected = [[(0, 1)],
+ [(0, 2), (1, 1)],
+ [(0, 3), (1, 2), (2, 1)],
+ [(0, 4), (1, 3), (2, 2), (3, 1)],
+ [(0, 5), (1, 4), (2, 3), (3, 2), (4, 1)],
+ [(0, 5), (1, 5), (2, 4), (3, 3), (4, 2), (5, 1)],
+ [(0, 4), (1, 4), (2, 4), (3, 3), (4, 2), (5, 1)],
+ [(0, 3), (1, 3), (2, 3), (3, 3), (4, 2), (5, 1)],
+ [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)],
+ [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]]
self._test_func(input, func, expected)
def test_group_by_key_and_window(self):