aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py20
1 files changed, 20 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index a2bfd79e1a..4949cd68e3 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -403,6 +403,26 @@ class BasicOperationTests(PySparkStreamingTestCase):
expected = [[('k', v)] for v in expected]
self._test_func(input, func, expected)
+ def test_update_state_by_key_initial_rdd(self):
+
+ def updater(vs, s):
+ if not s:
+ s = []
+ s.extend(vs)
+ return s
+
+ initial = [('k', [0, 1])]
+ initial = self.sc.parallelize(initial, 1)
+
+ input = [[('k', i)] for i in range(2, 5)]
+
+ def func(dstream):
+ return dstream.updateStateByKey(updater, initialRDD=initial)
+
+ expected = [[0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]]
+ expected = [[('k', v)] for v in expected]
+ self._test_func(input, func, expected)
+
def test_failed_func(self):
# Test failure in
# TransformFunction.apply(rdd: Option[RDD[_]], time: Time)