diff options
author | Bryan Cutler <bjcutler@us.ibm.com> | 2015-12-10 14:21:15 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-12-10 14:21:15 -0800 |
commit | 6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a (patch) | |
tree | a3b24c4b3c8682e6a4dfb659f600d7474636b9d3 /python/pyspark/streaming/tests.py | |
parent | 4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (diff) | |
download | spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.gz spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.bz2 spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.zip |
[SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark
Adding ability to define an initial state RDD for use with updateStateByKey PySpark. Added unit test and changed stateful_network_wordcount example to use initial RDD.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes #10082 from BryanCutler/initial-rdd-updateStateByKey-SPARK-11713.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r-- | python/pyspark/streaming/tests.py | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a2bfd79e1a..4949cd68e3 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -403,6 +403,26 @@ class BasicOperationTests(PySparkStreamingTestCase): expected = [[('k', v)] for v in expected] self._test_func(input, func, expected) + def test_update_state_by_key_initial_rdd(self): + + def updater(vs, s): + if not s: + s = [] + s.extend(vs) + return s + + initial = [('k', [0, 1])] + initial = self.sc.parallelize(initial, 1) + + input = [[('k', i)] for i in range(2, 5)] + + def func(dstream): + return dstream.updateStateByKey(updater, initialRDD=initial) + + expected = [[0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]] + expected = [[('k', v)] for v in expected] + self._test_func(input, func, expected) + def test_failed_func(self): # Test failure in # TransformFunction.apply(rdd: Option[RDD[_]], time: Time) |