aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-12-28 10:43:23 +0000
committerSean Owen <sowen@cloudera.com>2015-12-28 10:43:23 +0000
commit8d4940092141c0909b673607f393cdd53f093ed6 (patch)
treeab9a08c8d074d8880cb873306d128ee134864433 /python/pyspark/streaming/tests.py
parent5aa2710c1e587c340a66ed36af75214aea85a97a (diff)
downloadspark-8d4940092141c0909b673607f393cdd53f093ed6.tar.gz
spark-8d4940092141c0909b673607f393cdd53f093ed6.tar.bz2
spark-8d4940092141c0909b673607f393cdd53f093ed6.zip
[SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in Python API
The semantics of Python countByValue is different from Scala API, it is more like countDistinctValue, so here change to make it consistent with Scala/Java API. Author: jerryshao <sshao@hortonworks.com> Closes #10350 from jerryshao/SPARK-12353.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py17
1 files changed, 14 insertions, 3 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 4949cd68e3..86b05d9fd2 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -279,8 +279,10 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.countByValue()
- expected = [[4], [4], [3]]
- self._test_func(input, func, expected)
+ expected = [[(1, 2), (2, 2), (3, 2), (4, 2)],
+ [(5, 2), (6, 2), (7, 1), (8, 1)],
+ [("a", 2), ("b", 1), ("", 1)]]
+ self._test_func(input, func, expected, sort=True)
def test_groupByKey(self):
"""Basic operation test for DStream.groupByKey."""
@@ -651,7 +653,16 @@ class WindowFunctionTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.countByValueAndWindow(2.5, .5)
- expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
+ expected = [[(0, 1)],
+ [(0, 2), (1, 1)],
+ [(0, 3), (1, 2), (2, 1)],
+ [(0, 4), (1, 3), (2, 2), (3, 1)],
+ [(0, 5), (1, 4), (2, 3), (3, 2), (4, 1)],
+ [(0, 5), (1, 5), (2, 4), (3, 3), (4, 2), (5, 1)],
+ [(0, 4), (1, 4), (2, 4), (3, 3), (4, 2), (5, 1)],
+ [(0, 3), (1, 3), (2, 3), (3, 3), (4, 2), (5, 1)],
+ [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)],
+ [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]]
self._test_func(input, func, expected)
def test_group_by_key_and_window(self):