diff options
author | Bryan Cutler <bjcutler@us.ibm.com> | 2015-06-19 00:07:53 -0700 |
---|---|---|
committer | Davies Liu <davies@databricks.com> | 2015-06-19 00:07:53 -0700 |
commit | a2016b4bc4ef13339f168c3f4e135fa422046137 (patch) | |
tree | fc3e275a8e2c3d7738d73be70d81680f2b05d0bf | |
parent | 754929b153aba3a8f8fbafa1581957da4ccc18be (diff) | |
download | spark-a2016b4bc4ef13339f168c3f4e135fa422046137.tar.gz spark-a2016b4bc4ef13339f168c3f4e135fa422046137.tar.bz2 spark-a2016b4bc4ef13339f168c3f4e135fa422046137.zip |
[SPARK-8444] [STREAMING] Adding Python streaming example for queueStream
A Python example similar to the existing one for Scala.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes #6884 from BryanCutler/streaming-queueStream-example-8444 and squashes the following commits:
435ba7e [Bryan Cutler] [SPARK-8444] Fixed style checks, increased sleep time to show empty queue
257abb0 [Bryan Cutler] [SPARK-8444] Stop context gracefully, Removed unused import, Added description comment
376ef6e [Bryan Cutler] [SPARK-8444] Fixed bug causing DStream.pprint to append empty parenthesis to output instead of blank line
1ff5f8b [Bryan Cutler] [SPARK-8444] Adding Python streaming example for queue_stream
-rw-r--r-- | examples/src/main/python/streaming/queue_stream.py | 50 | ||||
-rw-r--r-- | python/pyspark/streaming/dstream.py | 2 |
2 files changed, 51 insertions, 1 deletions
diff --git a/examples/src/main/python/streaming/queue_stream.py b/examples/src/main/python/streaming/queue_stream.py new file mode 100644 index 0000000000..dcd6a0fc6f --- /dev/null +++ b/examples/src/main/python/streaming/queue_stream.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Create a queue of RDDs that will be mapped/reduced one at a time in + 1 second intervals. + + To run this example use + `$ bin/spark-submit examples/src/main/python/streaming/queue_stream.py +""" +import sys +import time + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext + +if __name__ == "__main__": + + sc = SparkContext(appName="PythonStreamingQueueStream") + ssc = StreamingContext(sc, 1) + + # Create the queue through which RDDs can be pushed to + # a QueueInputDStream + rddQueue = [] + for i in xrange(5): + rddQueue += [ssc.sparkContext.parallelize([j for j in xrange(1, 1001)], 10)] + + # Create the QueueInputDStream and use it do some processing + inputStream = ssc.queueStream(rddQueue) + mappedStream = inputStream.map(lambda x: (x % 10, 1)) + reducedStream = mappedStream.reduceByKey(lambda a, b: a + b) + reducedStream.pprint() + + ssc.start() + time.sleep(6) + ssc.stop(stopSparkContext=True, stopGraceFully=True) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ff097985fa..8dcb9645cd 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -176,7 +176,7 @@ class DStream(object): print(record) if len(taken) > num: print("...") - print() + print("") self.foreachRDD(takeAndPrint) |