aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-10-18 19:14:48 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-18 19:14:48 -0700
commit05db2da7dc256822cdb602c4821cbb9fb84dac98 (patch)
treed80715dff7672792539f9aaa66858999100324f5 /examples/src
parentf406a8391825d8866110f29a0d656c82cd064520 (diff)
downloadspark-05db2da7dc256822cdb602c4821cbb9fb84dac98.tar.gz
spark-05db2da7dc256822cdb602c4821cbb9fb84dac98.tar.bz2
spark-05db2da7dc256822cdb602c4821cbb9fb84dac98.zip
[SPARK-3952] [Streaming] [PySpark] add Python examples in Streaming Programming Guide
Having Python examples in Streaming Programming Guide. Also add RecoverableNetworkWordCount example. Author: Davies Liu <davies.liu@gmail.com> Author: Davies Liu <davies@databricks.com> Closes #2808 from davies/pyguide and squashes the following commits: 8d4bec4 [Davies Liu] update readme 26a7e37 [Davies Liu] fix format 3821c4d [Davies Liu] address comments, add missing file 7e4bb8a [Davies Liu] add Python examples in Streaming Programming Guide
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/python/streaming/recoverable_network_wordcount.py80
1 files changed, 80 insertions, 0 deletions
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
new file mode 100644
index 0000000000..fc6827c82b
--- /dev/null
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in text encoded with UTF8 received from the network every second.
+
+ Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory> <output-file>
+ <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ <output-file> file to which the word counts will be appended
+
+ To run this on your local machine, you need to first run a Netcat server
+ `$ nc -lk 9999`
+
+ and then run the example
+ `$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py \
+ localhost 9999 ~/checkpoint/ ~/out`
+
+ If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
+ a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
+ checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
+ the checkpoint data.
+"""
+
+import os
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+
+
+def createContext(host, port, outputPath):
+ # If you do not see this printed, that means the StreamingContext has been loaded
+ # from the new checkpoint
+ print "Creating new context"
+ if os.path.exists(outputPath):
+ os.remove(outputPath)
+ sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
+ ssc = StreamingContext(sc, 1)
+
+ # Create a socket stream on target ip:port and count the
+ # words in input stream of \n delimited text (eg. generated by 'nc')
+ lines = ssc.socketTextStream(host, port)
+ words = lines.flatMap(lambda line: line.split(" "))
+ wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
+
+ def echo(time, rdd):
+ counts = "Counts at time %s %s" % (time, rdd.collect())
+ print counts
+ print "Appending to " + os.path.abspath(outputPath)
+ with open(outputPath, 'a') as f:
+ f.write(counts + "\n")
+
+ wordCounts.foreachRDD(echo)
+ return ssc
+
+if __name__ == "__main__":
+ if len(sys.argv) != 5:
+ print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname> <port> "\
+ "<checkpoint-directory> <output-file>"
+ exit(-1)
+ host, port, checkpoint, output = sys.argv[1:]
+ ssc = StreamingContext.getOrCreate(checkpoint,
+ lambda: createContext(host, int(port), output))
+ ssc.start()
+ ssc.awaitTermination()