diff options
author | Bryan Cutler <bjcutler@us.ibm.com> | 2015-12-10 14:21:15 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-12-10 14:21:15 -0800 |
commit | 6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a (patch) | |
tree | a3b24c4b3c8682e6a4dfb659f600d7474636b9d3 /streaming/src | |
parent | 4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (diff) | |
download | spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.gz spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.bz2 spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.zip |
[SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark
Adding ability to define an initial state RDD for use with updateStateByKey PySpark. Added unit test and changed stateful_network_wordcount example to use initial RDD.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes #10082 from BryanCutler/initial-rdd-updateStateByKey-SPARK-11713.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 994309ddd0..056248ccc7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -264,9 +264,19 @@ private[python] class PythonTransformed2DStream( */ private[python] class PythonStateDStream( parent: DStream[Array[Byte]], - reduceFunc: PythonTransformFunction) + reduceFunc: PythonTransformFunction, + initialRDD: Option[RDD[Array[Byte]]]) extends PythonDStream(parent, reduceFunc) { + def this( + parent: DStream[Array[Byte]], + reduceFunc: PythonTransformFunction) = this(parent, reduceFunc, None) + + def this( + parent: DStream[Array[Byte]], + reduceFunc: PythonTransformFunction, + initialRDD: JavaRDD[Array[Byte]]) = this(parent, reduceFunc, Some(initialRDD.rdd)) + super.persist(StorageLevel.MEMORY_ONLY) override val mustCheckpoint = true @@ -274,7 +284,7 @@ private[python] class PythonStateDStream( val lastState = getOrCompute(validTime - slideDuration) val rdd = parent.getOrCompute(validTime) if (rdd.isDefined) { - func(lastState, rdd, validTime) + func(lastState.orElse(initialRDD), rdd, validTime) } else { lastState } |