diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-10-18 19:14:48 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-10-18 19:14:48 -0700 |
commit | 05db2da7dc256822cdb602c4821cbb9fb84dac98 (patch) | |
tree | d80715dff7672792539f9aaa66858999100324f5 /examples | |
parent | f406a8391825d8866110f29a0d656c82cd064520 (diff) | |
download | spark-05db2da7dc256822cdb602c4821cbb9fb84dac98.tar.gz spark-05db2da7dc256822cdb602c4821cbb9fb84dac98.tar.bz2 spark-05db2da7dc256822cdb602c4821cbb9fb84dac98.zip |
[SPARK-3952] [Streaming] [PySpark] add Python examples in Streaming Programming Guide
Having Python examples in Streaming Programming Guide.
Also add RecoverableNetworkWordCount example.
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes #2808 from davies/pyguide and squashes the following commits:
8d4bec4 [Davies Liu] update readme
26a7e37 [Davies Liu] fix format
3821c4d [Davies Liu] address comments, add missing file
7e4bb8a [Davies Liu] add Python examples in Streaming Programming Guide
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/python/streaming/recoverable_network_wordcount.py | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py new file mode 100644 index 0000000000..fc6827c82b --- /dev/null +++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in text encoded with UTF8 received from the network every second. + + Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory> <output-file> + <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive + data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data + <output-file> file to which the word counts will be appended + + To run this on your local machine, you need to first run a Netcat server + `$ nc -lk 9999` + + and then run the example + `$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py \ + localhost 9999 ~/checkpoint/ ~/out` + + If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create + a new StreamingContext (will print "Creating new context" to the console). Otherwise, if + checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from + the checkpoint data. +""" + +import os +import sys + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext + + +def createContext(host, port, outputPath): + # If you do not see this printed, that means the StreamingContext has been loaded + # from the new checkpoint + print "Creating new context" + if os.path.exists(outputPath): + os.remove(outputPath) + sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount") + ssc = StreamingContext(sc, 1) + + # Create a socket stream on target ip:port and count the + # words in input stream of \n delimited text (eg. generated by 'nc') + lines = ssc.socketTextStream(host, port) + words = lines.flatMap(lambda line: line.split(" ")) + wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) + + def echo(time, rdd): + counts = "Counts at time %s %s" % (time, rdd.collect()) + print counts + print "Appending to " + os.path.abspath(outputPath) + with open(outputPath, 'a') as f: + f.write(counts + "\n") + + wordCounts.foreachRDD(echo) + return ssc + +if __name__ == "__main__": + if len(sys.argv) != 5: + print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname> <port> "\ + "<checkpoint-directory> <output-file>" + exit(-1) + host, port, checkpoint, output = sys.argv[1:] + ssc = StreamingContext.getOrCreate(checkpoint, + lambda: createContext(host, int(port), output)) + ssc.start() + ssc.awaitTermination() |