diff options
author | giwa <ugw.gi.world@gmail.com> | 2014-10-12 02:46:56 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-10-12 02:46:56 -0700 |
commit | 69c67abaa9d4bb4b95792d1862bc65efc764c194 (patch) | |
tree | 756bd148c002a34f1277fc1ee0704c936e917920 /python/pyspark/streaming/dstream.py | |
parent | 7a3f589ef86200f99624fea8322e5af0cad774a7 (diff) | |
download | spark-69c67abaa9d4bb4b95792d1862bc65efc764c194.tar.gz spark-69c67abaa9d4bb4b95792d1862bc65efc764c194.tar.bz2 spark-69c67abaa9d4bb4b95792d1862bc65efc764c194.zip |
[SPARK-2377] Python API for Streaming
This patch brings Python API for Streaming.
This patch is based on work from @giwa
Author: giwa <ugw.gi.world@gmail.com>
Author: Ken Takagiwa <ken@Kens-MacBook-Pro.local>
Author: Davies Liu <davies.liu@gmail.com>
Author: Ken Takagiwa <ken@kens-mbp.gateway.sonic.net>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Ken <ugw.gi.world@gmail.com>
Author: Ken Takagiwa <ugw.gi.world@gmail.com>
Author: Matthew Farrellee <matt@redhat.com>
Closes #2538 from davies/streaming and squashes the following commits:
64561e4 [Davies Liu] fix tests
331ecce [Davies Liu] fix example
3e2492b [Davies Liu] change updateStateByKey() to easy API
182be73 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
02d0575 [Davies Liu] add wrapper for foreachRDD()
bebeb4a [Davies Liu] address all comments
6db00da [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
8380064 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
52c535b [Davies Liu] remove fix for sum()
e108ec1 [Davies Liu] address comments
37fe06f [Davies Liu] use random port for callback server
d05871e [Davies Liu] remove reuse of PythonRDD
be5e5ff [Davies Liu] merge branch of env, make tests stable.
8071541 [Davies Liu] Merge branch 'env' into streaming
c7bbbce [Davies Liu] fix sphinx docs
6bb9d91 [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
4d0ea8b [Davies Liu] clear reference of SparkEnv after stop
54bd92b [Davies Liu] improve tests
c2b31cb [Davies Liu] Merge branch 'master' of github.com:apache/spark into streaming
7a88f9f [Davies Liu] rollback RDD.setContext(), use textFileStream() to test checkpointing
bd8a4c2 [Davies Liu] fix scala style
7797c70 [Davies Liu] refactor
ff88bec [Davies Liu] rename RDDFunction to TransformFunction
d328aca [Davies Liu] fix serializer in queueStream
6f0da2f [Davies Liu] recover from checkpoint
fa7261b [Davies Liu] refactor
a13ff34 [Davies Liu] address comments
8466916 [Davies Liu] support checkpoint
9a16bd1 [Davies Liu] change number of partitions during tests
b98d63f [Davies Liu] change private[spark] to private[python]
eed6e2a [Davies Liu] rollback not needed changes
e00136b [Davies Liu] address comments
069a94c [Davies Liu] fix the number of partitions during window()
338580a [Davies Liu] change _first(), _take(), _collect() as private API
19797f9 [Davies Liu] clean up
6ebceca [Davies Liu] add more tests
c40c52d [Davies Liu] change first(), take(n) to has the same behavior as RDD
98ac6c2 [Davies Liu] support ssc.transform()
b983f0f [Davies Liu] address comments
847f9b9 [Davies Liu] add more docs, add first(), take()
e059ca2 [Davies Liu] move check of window into Python
fce0ef5 [Davies Liu] rafactor of foreachRDD()
7001b51 [Davies Liu] refactor of queueStream()
26ea396 [Davies Liu] refactor
74df565 [Davies Liu] fix print and docs
b32774c [Davies Liu] move java_import into streaming
604323f [Davies Liu] enable streaming tests
c499ba0 [Davies Liu] remove Time and Duration
3f0fb4b [Davies Liu] refactor fix tests
c28f520 [Davies Liu] support updateStateByKey
d357b70 [Davies Liu] support windowed dstream
bd13026 [Davies Liu] fix examples
eec401e [Davies Liu] refactor, combine TransformedRDD, fix reuse PythonRDD, fix union
9a57685 [Davies Liu] fix python style
bd27874 [Davies Liu] fix scala style
7339be0 [Davies Liu] delete tests
7f53086 [Davies Liu] support transform(), refactor and cleanup
df098fc [Davies Liu] Merge branch 'master' into giwa
550dfd9 [giwa] WIP fixing 1.1 merge
5cdb6fa [giwa] changed for SCCallSiteSync
e685853 [giwa] meged with rebased 1.1 branch
2d32a74 [giwa] added some StreamingContextTestSuite
4a59e1e [giwa] WIP:added more test for StreamingContext
8ffdbf1 [giwa] added atexit to handle callback server
d5f5fcb [giwa] added comment for StreamingContext.sparkContext
63c881a [giwa] added StreamingContext.sparkContext
d39f102 [giwa] added StreamingContext.remember
d542743 [giwa] clean up code
2fdf0de [Matthew Farrellee] Fix scalastyle errors
c0a06bc [giwa] delete not implemented functions
f385976 [giwa] delete inproper comments
b0f2015 [giwa] added comment in dstream._test_output
bebb3f3 [giwa] remove the last brank line
fbed8da [giwa] revert pom.xml
8ed93af [giwa] fixed explanaiton
066ba90 [giwa] revert pom.xml
fa4af88 [giwa] remove duplicated import
6ae3caa [giwa] revert pom.xml
7dc7391 [giwa] fixed typo
62dc7a3 [giwa] clean up exmples
f04882c [giwa] clen up examples
b171ec3 [giwa] fixed pep8 violation
f198d14 [giwa] clean up code
3166d31 [giwa] clean up
c00e091 [giwa] change test case not to use awaitTermination
e80647e [giwa] adopted the latest compression way of python command
58e41ff [giwa] merge with master
455e5af [giwa] removed wasted print in DStream
af336b7 [giwa] add comments
ddd4ee1 [giwa] added TODO coments
99ce042 [giwa] added saveAsTextFiles and saveAsPickledFiles
2a06cdb [giwa] remove waste duplicated code
c5ecfc1 [giwa] basic function test cases are passed
8dcda84 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
795b2cd [giwa] broke something
1e126bf [giwa] WIP: solved partitioned and None is not recognized
f67cf57 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
953deb0 [giwa] edited the comment to add more precise description
af610d3 [giwa] removed unnesessary changes
c1d546e [giwa] fixed PEP-008 violation
99410be [giwa] delete waste file
b3b0362 [giwa] added basic operation test cases
9cde7c9 [giwa] WIP added test case
bd3ba53 [giwa] WIP
5c04a5f [giwa] WIP: added PythonTestInputStream
019ef38 [giwa] WIP
1934726 [giwa] update comment
376e3ac [giwa] WIP
932372a [giwa] clean up dstream.py
0b09cff [giwa] added stop in StreamingContext
92e333e [giwa] implemented reduce and count function in Dstream
1b83354 [giwa] Removed the waste line
88f7506 [Ken Takagiwa] Kill py4j callback server properly
54b5358 [Ken Takagiwa] tried to restart callback server
4f07163 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
fe02547 [Ken Takagiwa] remove waste file
2ad7bd3 [Ken Takagiwa] clean up codes
6197a11 [Ken Takagiwa] clean up code
eb4bf48 [Ken Takagiwa] fix map function
98c2a00 [Ken Takagiwa] added count operation but this implementation need double check
58591d2 [Ken Takagiwa] reduceByKey is working
0df7111 [Ken Takagiwa] delete old file
f485b1d [Ken Takagiwa] fied input of socketTextDStream
dd6de81 [Ken Takagiwa] initial commit for socketTextStream
247fd74 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
4bcb318 [Ken Takagiwa] implementing transform function in Python
38adf95 [Ken Takagiwa] added reducedByKey not working yet
66fcfff [Ken Takagiwa] modify dstream.py to fix indent error
41886c2 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
0b99bec [Ken] initial commit for pySparkStreaming
c214199 [giwa] added testcase for combineByKey
5625bdc [giwa] added gorupByKey testcase
10ab87b [giwa] added sparkContext as input parameter in StreamingContext
10b5b04 [giwa] removed wasted print in DStream
e54f986 [giwa] add comments
16aa64f [giwa] added TODO coments
74535d4 [giwa] added saveAsTextFiles and saveAsPickledFiles
f76c182 [giwa] remove waste duplicated code
18c8723 [giwa] modified streaming test case to add coment
13fb44c [giwa] basic function test cases are passed
3000b2b [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
ff14070 [giwa] broke something
bcdec33 [giwa] WIP: solved partitioned and None is not recognized
270a9e1 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
bb10956 [giwa] edited the comment to add more precise description
253a863 [giwa] removed unnesessary changes
3d37822 [giwa] fixed PEP-008 violation
f21cab3 [giwa] delete waste file
878bad7 [giwa] added basic operation test cases
ce2acd2 [giwa] WIP added test case
9ad6855 [giwa] WIP
1df77f5 [giwa] WIP: added PythonTestInputStream
1523b66 [giwa] WIP
8a0fbbc [giwa] update comment
fe648e3 [giwa] WIP
29c2bc5 [giwa] initial commit for testcase
4d40d63 [giwa] clean up dstream.py
c462bb3 [giwa] added stop in StreamingContext
d2c01ba [giwa] clean up examples
3c45cd2 [giwa] implemented reduce and count function in Dstream
b349649 [giwa] Removed the waste line
3b498e1 [Ken Takagiwa] Kill py4j callback server properly
84a9668 [Ken Takagiwa] tried to restart callback server
9ab8952 [Tathagata Das] Added extra line.
05e991b [Tathagata Das] Added missing file
b1d2a30 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
678e854 [Ken Takagiwa] remove waste file
0a8bbbb [Ken Takagiwa] clean up codes
bab31c1 [Ken Takagiwa] clean up code
72b9738 [Ken Takagiwa] fix map function
d3ee86a [Ken Takagiwa] added count operation but this implementation need double check
15feea9 [Ken Takagiwa] edit python sparkstreaming example
6f98e50 [Ken Takagiwa] reduceByKey is working
c455c8d [Ken Takagiwa] added reducedByKey not working yet
dc6995d [Ken Takagiwa] delete old file
b31446a [Ken Takagiwa] fixed typo of network_workdcount.py
ccfd214 [Ken Takagiwa] added doctest for pyspark.streaming.duration
0d1b954 [Ken Takagiwa] fied input of socketTextDStream
f746109 [Ken Takagiwa] initial commit for socketTextStream
bb7ccf3 [Ken Takagiwa] remove unused import in python
224fc5e [Ken Takagiwa] add empty line
d2099d8 [Ken Takagiwa] sorted the import following Spark coding convention
5bac7ec [Ken Takagiwa] revert streaming/pom.xml
e1df940 [Ken Takagiwa] revert pom.xml
494cae5 [Ken Takagiwa] remove not implemented DStream functions in python
17a74c6 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
1a0f065 [Ken Takagiwa] implementing transform function in Python
d7b4d6f [Ken Takagiwa] added reducedByKey not working yet
87438e2 [Ken Takagiwa] modify dstream.py to fix indent error
b406252 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
454981d [Ken] initial commit for pySparkStreaming
150b94c [giwa] added some StreamingContextTestSuite
f7bc8f9 [giwa] WIP:added more test for StreamingContext
ee50c5a [giwa] added atexit to handle callback server
fdc9125 [giwa] added comment for StreamingContext.sparkContext
f5bfb70 [giwa] added StreamingContext.sparkContext
da09768 [giwa] added StreamingContext.remember
d68b568 [giwa] clean up code
4afa390 [giwa] clean up code
1fd6bc7 [Ken Takagiwa] Merge pull request #2 from mattf/giwa-master
d9d59fe [Matthew Farrellee] Fix scalastyle errors
67473a9 [giwa] delete not implemented functions
c97377c [giwa] delete inproper comments
2ea769e [giwa] added comment in dstream._test_output
3b27bd4 [giwa] remove the last brank line
acfcaeb [giwa] revert pom.xml
93f7637 [giwa] fixed explanaiton
50fd6f9 [giwa] revert pom.xml
4f82c89 [giwa] remove duplicated import
9d1de23 [giwa] revert pom.xml
7339df2 [giwa] fixed typo
9c85e48 [giwa] clean up exmples
24f95db [giwa] clen up examples
0d30109 [giwa] fixed pep8 violation
b7dab85 [giwa] improve test case
583e66d [giwa] move tests for streaming inside streaming directory
1d84142 [giwa] remove unimplement test
f0ea311 [giwa] clean up code
171edeb [giwa] clean up
4dedd2d [giwa] change test case not to use awaitTermination
268a6a5 [giwa] Changed awaitTermination not to call awaitTermincation in Scala. Just use time.sleep instread
09a28bf [giwa] improve testcases
58150f5 [giwa] Changed the test case to focus the test operation
199e37f [giwa] adopted the latest compression way of python command
185fdbf [giwa] merge with master
f1798c4 [giwa] merge with master
e70f706 [giwa] added testcase for combineByKey
e162822 [giwa] added gorupByKey testcase
97742fe [giwa] added sparkContext as input parameter in StreamingContext
14d4c0e [giwa] removed wasted print in DStream
6d8190a [giwa] add comments
4aa99e4 [giwa] added TODO coments
e9fab72 [giwa] added saveAsTextFiles and saveAsPickledFiles
94f2b65 [giwa] remove waste duplicated code
580fbc2 [giwa] modified streaming test case to add coment
99e4bb3 [giwa] basic function test cases are passed
7051a84 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
35933e1 [giwa] broke something
9767712 [giwa] WIP: solved partitioned and None is not recognized
4f2d7e6 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
33c0f94d [giwa] edited the comment to add more precise description
774f18d [giwa] removed unnesessary changes
3a671cc [giwa] remove export PYSPARK_PYTHON in spark submit
8efa266 [giwa] fixed PEP-008 violation
fa75d71 [giwa] delete waste file
7f96294 [giwa] added basic operation test cases
3dda31a [giwa] WIP added test case
1f68b78 [giwa] WIP
c05922c [giwa] WIP: added PythonTestInputStream
1fd12ae [giwa] WIP
c880a33 [giwa] update comment
5d22c92 [giwa] WIP
ea4b06b [giwa] initial commit for testcase
5a9b525 [giwa] clean up dstream.py
79c5809 [giwa] added stop in StreamingContext
189dcea [giwa] clean up examples
b8d7d24 [giwa] implemented reduce and count function in Dstream
b6468e6 [giwa] Removed the waste line
b47b5fd [Ken Takagiwa] Kill py4j callback server properly
19ddcdd [Ken Takagiwa] tried to restart callback server
c9fc124 [Tathagata Das] Added extra line.
4caae3f [Tathagata Das] Added missing file
4eff053 [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
5e822d4 [Ken Takagiwa] remove waste file
aeaf8a5 [Ken Takagiwa] clean up codes
9fa249b [Ken Takagiwa] clean up code
05459c6 [Ken Takagiwa] fix map function
a9f4ecb [Ken Takagiwa] added count operation but this implementation need double check
d1ee6ca [Ken Takagiwa] edit python sparkstreaming example
0b8b7d0 [Ken Takagiwa] reduceByKey is working
d25d5cf [Ken Takagiwa] added reducedByKey not working yet
7f7c5d1 [Ken Takagiwa] delete old file
967dc26 [Ken Takagiwa] fixed typo of network_workdcount.py
57fb740 [Ken Takagiwa] added doctest for pyspark.streaming.duration
4b69fb1 [Ken Takagiwa] fied input of socketTextDStream
02f618a [Ken Takagiwa] initial commit for socketTextStream
4ce4058 [Ken Takagiwa] remove unused import in python
856d98e [Ken Takagiwa] add empty line
490e338 [Ken Takagiwa] sorted the import following Spark coding convention
5594bd4 [Ken Takagiwa] revert pom.xml
2adca84 [Ken Takagiwa] remove not implemented DStream functions in python
e551e13 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
3758175 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
c5518b4 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
dcf243f [Ken Takagiwa] implementing transform function in Python
9af03f4 [Ken Takagiwa] added reducedByKey not working yet
6e0d9c7 [Ken Takagiwa] modify dstream.py to fix indent error
e497b9b [Ken Takagiwa] comment PythonDStream.PairwiseDStream
5c3a683 [Ken] initial commit for pySparkStreaming
665bfdb [giwa] added testcase for combineByKey
a3d2379 [giwa] added gorupByKey testcase
636090a [giwa] added sparkContext as input parameter in StreamingContext
e7ebb08 [giwa] removed wasted print in DStream
d8b593b [giwa] add comments
ea9c873 [giwa] added TODO coments
89ae38a [giwa] added saveAsTextFiles and saveAsPickledFiles
e3033fc [giwa] remove waste duplicated code
a14c7e1 [giwa] modified streaming test case to add coment
536def4 [giwa] basic function test cases are passed
2112638 [giwa] all tests are passed if numSlice is 2 and the numver of each input is over 4
080541a [giwa] broke something
0704b86 [giwa] WIP: solved partitioned and None is not recognized
90a6484 [giwa] added mapValues and flatMapVaules WIP for glom and mapPartitions test
a65f302 [giwa] edited the comment to add more precise description
bdde697 [giwa] removed unnesessary changes
e8c7bfc [giwa] remove export PYSPARK_PYTHON in spark submit
3334169 [giwa] fixed PEP-008 violation
db0a303 [giwa] delete waste file
2cfd3a0 [giwa] added basic operation test cases
90ae568 [giwa] WIP added test case
a120d07 [giwa] WIP
f671cdb [giwa] WIP: added PythonTestInputStream
56fae45 [giwa] WIP
e35e101 [giwa] Merge branch 'master' into testcase
ba5112d [giwa] update comment
28aa56d [giwa] WIP
fb08559 [giwa] initial commit for testcase
a613b85 [giwa] clean up dstream.py
c40c0ef [giwa] added stop in StreamingContext
31e4260 [giwa] clean up examples
d2127d6 [giwa] implemented reduce and count function in Dstream
48f7746 [giwa] Removed the waste line
0f83eaa [Ken Takagiwa] delete py4j 0.8.1
1679808 [Ken Takagiwa] Kill py4j callback server properly
f96cd4e [Ken Takagiwa] tried to restart callback server
fe86198 [Ken Takagiwa] add py4j 0.8.2.1 but server is not launched
1064fe0 [Ken Takagiwa] Merge branch 'master' of https://github.com/giwa/spark
28c6620 [Ken Takagiwa] Implemented DStream.foreachRDD in the Python API using Py4J callback server
85b0fe1 [Ken Takagiwa] Merge pull request #1 from tdas/python-foreach
54e2e8c [Tathagata Das] Added extra line.
e185338 [Tathagata Das] Added missing file
a778d4b [Tathagata Das] Implemented DStream.foreachRDD in the Python API using Py4J callback server.
cc2092b [Ken Takagiwa] remove waste file
d042ac6 [Ken Takagiwa] clean up codes
84a021f [Ken Takagiwa] clean up code
bd20e17 [Ken Takagiwa] fix map function
d01a125 [Ken Takagiwa] added count operation but this implementation need double check
7d05109 [Ken Takagiwa] merge with remote branch
ae464e0 [Ken Takagiwa] edit python sparkstreaming example
04af046 [Ken Takagiwa] reduceByKey is working
3b6d7b0 [Ken Takagiwa] implementing transform function in Python
571d52d [Ken Takagiwa] added reducedByKey not working yet
5720979 [Ken Takagiwa] delete old file
e604fcb [Ken Takagiwa] fixed typo of network_workdcount.py
4b7c08b [Ken Takagiwa] Merge branch 'master' of https://github.com/giwa/spark
ce7d426 [Ken Takagiwa] added doctest for pyspark.streaming.duration
a8c9fd5 [Ken Takagiwa] fixed for socketTextStream
a61fa9e [Ken Takagiwa] fied input of socketTextDStream
1e84f41 [Ken Takagiwa] initial commit for socketTextStream
6d012f7 [Ken Takagiwa] remove unused import in python
25d30d5 [Ken Takagiwa] add empty line
6e0a64a [Ken Takagiwa] sorted the import following Spark coding convention
fa4a7fc [Ken Takagiwa] revert streaming/pom.xml
8f8202b [Ken Takagiwa] revert streaming pom.xml
c9d79dd [Ken Takagiwa] revert pom.xml
57e3e52 [Ken Takagiwa] remove not implemented DStream functions in python
0a516f5 [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
a7a0b5c [Ken Takagiwa] add coment for hack why PYSPARK_PYTHON is needed in spark-submit
72bfc66 [Ken Takagiwa] modified the code base on comment in https://github.com/tdas/spark/pull/10
69e9cd3 [Ken Takagiwa] implementing transform function in Python
94a0787 [Ken Takagiwa] added reducedByKey not working yet
88068cf [Ken Takagiwa] modify dstream.py to fix indent error
1367be5 [Ken Takagiwa] comment PythonDStream.PairwiseDStream
eb2b3ba [Ken] Merge remote-tracking branch 'upstream/master'
d8e51f9 [Ken] initial commit for pySparkStreaming
Diffstat (limited to 'python/pyspark/streaming/dstream.py')
-rw-r--r-- | python/pyspark/streaming/dstream.py | 621 |
1 files changed, 621 insertions, 0 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py new file mode 100644 index 0000000000..5ae5cf07f0 --- /dev/null +++ b/python/pyspark/streaming/dstream.py @@ -0,0 +1,621 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from itertools import chain, ifilter, imap +import operator +import time +from datetime import datetime + +from py4j.protocol import Py4JJavaError + +from pyspark import RDD +from pyspark.storagelevel import StorageLevel +from pyspark.streaming.util import rddToFileName, TransformFunction +from pyspark.rdd import portable_hash +from pyspark.resultiterable import ResultIterable + +__all__ = ["DStream"] + + +class DStream(object): + """ + A Discretized Stream (DStream), the basic abstraction in Spark Streaming, + is a continuous sequence of RDDs (of the same type) representing a + continuous stream of data (see L{RDD} in the Spark core documentation + for more details on RDDs). + + DStreams can either be created from live data (such as, data from TCP + sockets, Kafka, Flume, etc.) using a L{StreamingContext} or it can be + generated by transforming existing DStreams using operations such as + `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming + program is running, each DStream periodically generates a RDD, either + from live data or by transforming the RDD generated by a parent DStream. + + DStreams internally is characterized by a few basic properties: + - A list of other DStreams that the DStream depends on + - A time interval at which the DStream generates an RDD + - A function that is used to generate an RDD after each time interval + """ + def __init__(self, jdstream, ssc, jrdd_deserializer): + self._jdstream = jdstream + self._ssc = ssc + self._sc = ssc._sc + self._jrdd_deserializer = jrdd_deserializer + self.is_cached = False + self.is_checkpointed = False + + def context(self): + """ + Return the StreamingContext associated with this DStream + """ + return self._ssc + + def count(self): + """ + Return a new DStream in which each RDD has a single element + generated by counting each RDD of this DStream. + """ + return self.mapPartitions(lambda i: [sum(1 for _ in i)]).reduce(operator.add) + + def filter(self, f): + """ + Return a new DStream containing only the elements that satisfy predicate. + """ + def func(iterator): + return ifilter(f, iterator) + return self.mapPartitions(func, True) + + def flatMap(self, f, preservesPartitioning=False): + """ + Return a new DStream by applying a function to all elements of + this DStream, and then flattening the results + """ + def func(s, iterator): + return chain.from_iterable(imap(f, iterator)) + return self.mapPartitionsWithIndex(func, preservesPartitioning) + + def map(self, f, preservesPartitioning=False): + """ + Return a new DStream by applying a function to each element of DStream. + """ + def func(iterator): + return imap(f, iterator) + return self.mapPartitions(func, preservesPartitioning) + + def mapPartitions(self, f, preservesPartitioning=False): + """ + Return a new DStream in which each RDD is generated by applying + mapPartitions() to each RDDs of this DStream. + """ + def func(s, iterator): + return f(iterator) + return self.mapPartitionsWithIndex(func, preservesPartitioning) + + def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + """ + Return a new DStream in which each RDD is generated by applying + mapPartitionsWithIndex() to each RDDs of this DStream. + """ + return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning)) + + def reduce(self, func): + """ + Return a new DStream in which each RDD has a single element + generated by reducing each RDD of this DStream. + """ + return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) + + def reduceByKey(self, func, numPartitions=None): + """ + Return a new DStream by applying reduceByKey to each RDD. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.combineByKey(lambda x: x, func, func, numPartitions) + + def combineByKey(self, createCombiner, mergeValue, mergeCombiners, + numPartitions=None): + """ + Return a new DStream by applying combineByKey to each RDD. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + + def func(rdd): + return rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions) + return self.transform(func) + + def partitionBy(self, numPartitions, partitionFunc=portable_hash): + """ + Return a copy of the DStream in which each RDD are partitioned + using the specified partitioner. + """ + return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc)) + + def foreachRDD(self, func): + """ + Apply a function to each RDD in this DStream. + """ + if func.func_code.co_argcount == 1: + old_func = func + func = lambda t, rdd: old_func(rdd) + jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) + api = self._ssc._jvm.PythonDStream + api.callForeachRDD(self._jdstream, jfunc) + + def pprint(self): + """ + Print the first ten elements of each RDD generated in this DStream. + """ + def takeAndPrint(time, rdd): + taken = rdd.take(11) + print "-------------------------------------------" + print "Time: %s" % time + print "-------------------------------------------" + for record in taken[:10]: + print record + if len(taken) > 10: + print "..." + print + + self.foreachRDD(takeAndPrint) + + def mapValues(self, f): + """ + Return a new DStream by applying a map function to the value of + each key-value pairs in this DStream without changing the key. + """ + map_values_fn = lambda (k, v): (k, f(v)) + return self.map(map_values_fn, preservesPartitioning=True) + + def flatMapValues(self, f): + """ + Return a new DStream by applying a flatmap function to the value + of each key-value pairs in this DStream without changing the key. + """ + flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) + return self.flatMap(flat_map_fn, preservesPartitioning=True) + + def glom(self): + """ + Return a new DStream in which RDD is generated by applying glom() + to RDD of this DStream. + """ + def func(iterator): + yield list(iterator) + return self.mapPartitions(func) + + def cache(self): + """ + Persist the RDDs of this DStream with the default storage level + (C{MEMORY_ONLY_SER}). + """ + self.is_cached = True + self.persist(StorageLevel.MEMORY_ONLY_SER) + return self + + def persist(self, storageLevel): + """ + Persist the RDDs of this DStream with the given storage level + """ + self.is_cached = True + javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel) + self._jdstream.persist(javaStorageLevel) + return self + + def checkpoint(self, interval): + """ + Enable periodic checkpointing of RDDs of this DStream + + @param interval: time in seconds, after each period of that, generated + RDD will be checkpointed + """ + self.is_checkpointed = True + self._jdstream.checkpoint(self._ssc._jduration(interval)) + return self + + def groupByKey(self, numPartitions=None): + """ + Return a new DStream by applying groupByKey on each RDD. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.transform(lambda rdd: rdd.groupByKey(numPartitions)) + + def countByValue(self): + """ + Return a new DStream in which each RDD contains the counts of each + distinct value in each RDD of this DStream. + """ + return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: None).count() + + def saveAsTextFiles(self, prefix, suffix=None): + """ + Save each RDD in this DStream as at text file, using string + representation of elements. + """ + def saveAsTextFile(t, rdd): + path = rddToFileName(prefix, suffix, t) + try: + rdd.saveAsTextFile(path) + except Py4JJavaError as e: + # after recovered from checkpointing, the foreachRDD may + # be called twice + if 'FileAlreadyExistsException' not in str(e): + raise + return self.foreachRDD(saveAsTextFile) + + # TODO: uncomment this until we have ssc.pickleFileStream() + # def saveAsPickleFiles(self, prefix, suffix=None): + # """ + # Save each RDD in this DStream as at binary file, the elements are + # serialized by pickle. + # """ + # def saveAsPickleFile(t, rdd): + # path = rddToFileName(prefix, suffix, t) + # try: + # rdd.saveAsPickleFile(path) + # except Py4JJavaError as e: + # # after recovered from checkpointing, the foreachRDD may + # # be called twice + # if 'FileAlreadyExistsException' not in str(e): + # raise + # return self.foreachRDD(saveAsPickleFile) + + def transform(self, func): + """ + Return a new DStream in which each RDD is generated by applying a function + on each RDD of this DStream. + + `func` can have one argument of `rdd`, or have two arguments of + (`time`, `rdd`) + """ + if func.func_code.co_argcount == 1: + oldfunc = func + func = lambda t, rdd: oldfunc(rdd) + assert func.func_code.co_argcount == 2, "func should take one or two arguments" + return TransformedDStream(self, func) + + def transformWith(self, func, other, keepSerializer=False): + """ + Return a new DStream in which each RDD is generated by applying a function + on each RDD of this DStream and 'other' DStream. + + `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three + arguments of (`time`, `rdd_a`, `rdd_b`) + """ + if func.func_code.co_argcount == 2: + oldfunc = func + func = lambda t, a, b: oldfunc(a, b) + assert func.func_code.co_argcount == 3, "func should take two or three arguments" + jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer) + dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(), + other._jdstream.dstream(), jfunc) + jrdd_serializer = self._jrdd_deserializer if keepSerializer else self._sc.serializer + return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer) + + def repartition(self, numPartitions): + """ + Return a new DStream with an increased or decreased level of parallelism. + """ + return self.transform(lambda rdd: rdd.repartition(numPartitions)) + + @property + def _slideDuration(self): + """ + Return the slideDuration in seconds of this DStream + """ + return self._jdstream.dstream().slideDuration().milliseconds() / 1000.0 + + def union(self, other): + """ + Return a new DStream by unifying data of another DStream with this DStream. + + @param other: Another DStream having the same interval (i.e., slideDuration) + as this DStream. + """ + if self._slideDuration != other._slideDuration: + raise ValueError("the two DStream should have same slide duration") + return self.transformWith(lambda a, b: a.union(b), other, True) + + def cogroup(self, other, numPartitions=None): + """ + Return a new DStream by applying 'cogroup' between RDDs of this + DStream and `other` DStream. + + Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), other) + + def join(self, other, numPartitions=None): + """ + Return a new DStream by applying 'join' between RDDs of this DStream and + `other` DStream. + + Hash partitioning is used to generate the RDDs with `numPartitions` + partitions. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.transformWith(lambda a, b: a.join(b, numPartitions), other) + + def leftOuterJoin(self, other, numPartitions=None): + """ + Return a new DStream by applying 'left outer join' between RDDs of this DStream and + `other` DStream. + + Hash partitioning is used to generate the RDDs with `numPartitions` + partitions. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.transformWith(lambda a, b: a.leftOuterJoin(b, numPartitions), other) + + def rightOuterJoin(self, other, numPartitions=None): + """ + Return a new DStream by applying 'right outer join' between RDDs of this DStream and + `other` DStream. + + Hash partitioning is used to generate the RDDs with `numPartitions` + partitions. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.transformWith(lambda a, b: a.rightOuterJoin(b, numPartitions), other) + + def fullOuterJoin(self, other, numPartitions=None): + """ + Return a new DStream by applying 'full outer join' between RDDs of this DStream and + `other` DStream. + + Hash partitioning is used to generate the RDDs with `numPartitions` + partitions. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other) + + def _jtime(self, timestamp): + """ Convert datetime or unix_timestamp into Time + """ + if isinstance(timestamp, datetime): + timestamp = time.mktime(timestamp.timetuple()) + return self._sc._jvm.Time(long(timestamp * 1000)) + + def slice(self, begin, end): + """ + Return all the RDDs between 'begin' to 'end' (both included) + + `begin`, `end` could be datetime.datetime() or unix_timestamp + """ + jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end)) + return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds] + + def _validate_window_param(self, window, slide): + duration = self._jdstream.dstream().slideDuration().milliseconds() + if int(window * 1000) % duration != 0: + raise ValueError("windowDuration must be multiple of the slide duration (%d ms)" + % duration) + if slide and int(slide * 1000) % duration != 0: + raise ValueError("slideDuration must be multiple of the slide duration (%d ms)" + % duration) + + def window(self, windowDuration, slideDuration=None): + """ + Return a new DStream in which each RDD contains all the elements in seen in a + sliding window of time over this DStream. + + @param windowDuration: width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration: sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + """ + self._validate_window_param(windowDuration, slideDuration) + d = self._ssc._jduration(windowDuration) + if slideDuration is None: + return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer) + s = self._ssc._jduration(slideDuration) + return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer) + + def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration): + """ + Return a new DStream in which each RDD has a single element generated by reducing all + elements in a sliding window over this DStream. + + if `invReduceFunc` is not None, the reduction is done incrementally + using the old window's reduced value : + 1. reduce the new values that entered the window (e.g., adding new counts) + 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + This is more efficient than `invReduceFunc` is None. + + @param reduceFunc: associative reduce function + @param invReduceFunc: inverse reduce function of `reduceFunc` + @param windowDuration: width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration: sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + """ + keyed = self.map(lambda x: (1, x)) + reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, + windowDuration, slideDuration, 1) + return reduced.map(lambda (k, v): v) + + def countByWindow(self, windowDuration, slideDuration): + """ + Return a new DStream in which each RDD has a single element generated + by counting the number of elements in a window over this DStream. + windowDuration and slideDuration are as defined in the window() operation. + + This is equivalent to window(windowDuration, slideDuration).count(), + but will be more efficient if window is large. + """ + return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub, + windowDuration, slideDuration) + + def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None): + """ + Return a new DStream in which each RDD contains the count of distinct elements in + RDDs in a sliding window over this DStream. + + @param windowDuration: width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration: sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + @param numPartitions: number of partitions of each RDD in the new DStream. + """ + keyed = self.map(lambda x: (x, 1)) + counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, + windowDuration, slideDuration, numPartitions) + return counted.filter(lambda (k, v): v > 0).count() + + def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): + """ + Return a new DStream by applying `groupByKey` over a sliding window. + Similar to `DStream.groupByKey()`, but applies it over a sliding window. + + @param windowDuration: width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration: sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + @param numPartitions: Number of partitions of each RDD in the new DStream. + """ + ls = self.mapValues(lambda x: [x]) + grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):], + windowDuration, slideDuration, numPartitions) + return grouped.mapValues(ResultIterable) + + def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None, + numPartitions=None, filterFunc=None): + """ + Return a new DStream by applying incremental `reduceByKey` over a sliding window. + + The reduced value of over a new window is calculated using the old window's reduce value : + 1. reduce the new values that entered the window (e.g., adding new counts) + 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + + `invFunc` can be None, then it will reduce all the RDDs in window, could be slower + than having `invFunc`. + + @param reduceFunc: associative reduce function + @param invReduceFunc: inverse function of `reduceFunc` + @param windowDuration: width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration: sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + @param numPartitions: number of partitions of each RDD in the new DStream. + @param filterFunc: function to filter expired key-value pairs; + only pairs that satisfy the function are retained + set this to null if you do not want to filter + """ + self._validate_window_param(windowDuration, slideDuration) + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + + reduced = self.reduceByKey(func, numPartitions) + + def reduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + r = a.union(b).reduceByKey(func, numPartitions) if a else b + if filterFunc: + r = r.filter(filterFunc) + return r + + def invReduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + joined = a.leftOuterJoin(b, numPartitions) + return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1) + + jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) + if invReduceFunc: + jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) + else: + jinvReduceFunc = None + if slideDuration is None: + slideDuration = self._slideDuration + dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), + jreduceFunc, jinvReduceFunc, + self._ssc._jduration(windowDuration), + self._ssc._jduration(slideDuration)) + return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) + + def updateStateByKey(self, updateFunc, numPartitions=None): + """ + Return a new "state" DStream where the state for each key is updated by applying + the given function on the previous state of the key and the new values of the key. + + @param updateFunc: State update function. If this function returns None, then + corresponding state key-value pair will be eliminated. + """ + if numPartitions is None: + numPartitions = self._sc.defaultParallelism + + def reduceFunc(t, a, b): + if a is None: + g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None)) + else: + g = a.cogroup(b, numPartitions) + g = g.mapValues(lambda (va, vb): (list(vb), list(va)[0] if len(va) else None)) + state = g.mapValues(lambda (vs, s): updateFunc(vs, s)) + return state.filter(lambda (k, v): v is not None) + + jreduceFunc = TransformFunction(self._sc, reduceFunc, + self._sc.serializer, self._jrdd_deserializer) + dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc) + return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) + + +class TransformedDStream(DStream): + """ + TransformedDStream is an DStream generated by an Python function + transforming each RDD of an DStream to another RDDs. + + Multiple continuous transformations of DStream can be combined into + one transformation. + """ + def __init__(self, prev, func): + self._ssc = prev._ssc + self._sc = self._ssc._sc + self._jrdd_deserializer = self._sc.serializer + self.is_cached = False + self.is_checkpointed = False + self._jdstream_val = None + + if (isinstance(prev, TransformedDStream) and + not prev.is_cached and not prev.is_checkpointed): + prev_func = prev.func + self.func = lambda t, rdd: func(t, prev_func(t, rdd)) + self.prev = prev.prev + else: + self.prev = prev + self.func = func + + @property + def _jdstream(self): + if self._jdstream_val is not None: + return self._jdstream_val + + jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer) + dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc) + self._jdstream_val = dstream.asJavaDStream() + return self._jdstream_val |