From 05db2da7dc256822cdb602c4821cbb9fb84dac98 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 18 Oct 2014 19:14:48 -0700 Subject: [SPARK-3952] [Streaming] [PySpark] add Python examples in Streaming Programming Guide Having Python examples in Streaming Programming Guide. Also add RecoverableNetworkWordCount example. Author: Davies Liu Author: Davies Liu Closes #2808 from davies/pyguide and squashes the following commits: 8d4bec4 [Davies Liu] update readme 26a7e37 [Davies Liu] fix format 3821c4d [Davies Liu] address comments, add missing file 7e4bb8a [Davies Liu] add Python examples in Streaming Programming Guide --- docs/streaming-programming-guide.md | 304 ++++++++++++++++++++++++++++++++++-- 1 file changed, 295 insertions(+), 9 deletions(-) (limited to 'docs/streaming-programming-guide.md') diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 738309c668..8bbba88b31 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -212,6 +212,67 @@ The complete code can be found in the Spark Streaming example [JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java).
+ +
+First, we import StreamingContext, which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second. + +{% highlight python %} +from pyspark import SparkContext +from pyspark.streaming import StreamingContext + +# Create a local StreamingContext with two working thread and batch interval of 1 second +sc = SparkContext("local[2]", "NetworkWordCount") +ssc = StreamingContext(sc, 1) +{% endhighlight %} + +Using this context, we can create a DStream that represents streaming data from a TCP +source hostname, e.g. `localhost`, and port, e.g. `9999` + +{% highlight python %} +# Create a DStream that will connect to hostname:port, like localhost:9999 +lines = ssc.socketTextStream("localhost", 9999) +{% endhighlight %} + +This `lines` DStream represents the stream of data that will be received from the data +server. Each record in this DStream is a line of text. Next, we want to split the lines by +space into words. + +{% highlight python %} +# Split each line into words +words = lines.flatMap(lambda line: line.split(" ")) +{% endhighlight %} + +`flatMap` is a one-to-many DStream operation that creates a new DStream by +generating multiple new records from each record in the source DStream. In this case, +each line will be split into multiple words and the stream of words is represented as the +`words` DStream. Next, we want to count these words. + +{% highlight python %} +# Count each word in each batch +pairs = words.map(lambda word: (word, 1)) +wordCounts = pairs.reduceByKey(lambda x, y: x + y) + +# Print the first ten elements of each RDD generated in this DStream to the console +wordCounts.pprint() +{% endhighlight %} + +The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word, +1)` pairs, which is then reduced to get the frequency of words in each batch of data. +Finally, `wordCounts.pprint()` will print a few of the counts generated every second. + +Note that when these lines are executed, Spark Streaming only sets up the computation it +will perform when it is started, and no real processing has started yet. To start the processing +after all the transformations have been setup, we finally call + +{% highlight python %} +ssc.start() # Start the computation +ssc.awaitTermination() # Wait for the computation to terminate +{% endhighlight %} + +The complete code can be found in the Spark Streaming example +[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/network_wordcount.py). +
+
@@ -236,6 +297,11 @@ $ ./bin/run-example streaming.NetworkWordCount localhost 9999 $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999 {% endhighlight %} +
+{% highlight bash %} +$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999 +{% endhighlight %} +
@@ -259,8 +325,11 @@ hello world +
+ +
{% highlight bash %} -# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount +# TERMINAL 2: RUNNING NetworkWordCount $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... @@ -271,6 +340,37 @@ Time: 1357008430000 ms (world,1) ... {% endhighlight %} +
+ +
+{% highlight bash %} +# TERMINAL 2: RUNNING JavaNetworkWordCount + +$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999 +... +------------------------------------------- +Time: 1357008430000 ms +------------------------------------------- +(hello,1) +(world,1) +... +{% endhighlight %} +
+
+{% highlight bash %} +# TERMINAL 2: RUNNING network_wordcount.py + +$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999 +... +------------------------------------------- +Time: 2014-10-14 15:25:21 +------------------------------------------- +(hello,1) +(world,1) +... +{% endhighlight %} +
+
@@ -398,9 +498,34 @@ JavaSparkContext sc = ... //existing JavaSparkContext JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000)); {% endhighlight %} +
+ +A [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) object can be created from a [SparkContext](api/python/pyspark.html#pyspark.SparkContext) object. + +{% highlight python %} +from pyspark import SparkContext +from pyspark.streaming import StreamingContext + +sc = SparkContext(master, appName) +ssc = StreamingContext(sc, 1) +{% endhighlight %} + +The `appName` parameter is a name for your application to show on the cluster UI. +`master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls), +or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster, +you will not want to hardcode `master` in the program, +but rather [launch the application with `spark-submit`](submitting-applications.html) and +receive it there. However, for local testing and unit tests, you can pass "local[\*]" to run Spark Streaming +in-process (detects the number of cores in the local system). + +The batch interval must be set based on the latency requirements of your application +and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size) +section for more details. +
After a context is defined, you have to do the follow steps. + 1. Define the input sources. 1. Setup the streaming computations. 1. Start the receiving and procesing of data using `streamingContext.start()`. @@ -483,6 +608,9 @@ methods for creating DStreams from files and Akka actors as input sources.
streamingContext.fileStream(dataDirectory);
+
+ streamingContext.textFileStream(dataDirectory) +
Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that @@ -684,13 +812,30 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi JavaPairDStream runningCounts = pairs.updateStateByKey(updateFunction); {% endhighlight %} + +
+ +{% highlight python %} +def updateFunction(newValues, runningCount): + if runningCount is None: + runningCount = 0 + return sum(newValues, runningCount) # add the new values with the previous running count to get the new count +{% endhighlight %} + +This is applied on a DStream containing words (say, the `pairs` DStream containing `(word, +1)` pairs in the [earlier example](#a-quick-example)). + +{% highlight python %} +runningCounts = pairs.updateStateByKey(updateFunction) +{% endhighlight %} +
The update function will be called for each word, with `newValues` having a sequence of 1's (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete Scala code, take a look at the example -[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala). +[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py). #### Transform Operation {:.no_toc} @@ -732,6 +877,15 @@ JavaPairDStream cleanedDStream = wordCounts.transform( }); {% endhighlight %} + +
+ +{% highlight python %} +spamInfoRDD = sc.pickleFile(...) # RDD containing spam information + +# join data stream with spam information to do data cleaning +cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...)) +{% endhighlight %}
@@ -793,6 +947,14 @@ Function2 reduceFunc = new Function2 windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000)); {% endhighlight %} + +
+ +{% highlight python %} +# Reduce last 30 seconds of data, every 10 seconds +windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10) +{% endhighlight %} +
@@ -860,6 +1022,7 @@ see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream) and [PairDStreamFunctions](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions). For the Java API, see [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html) and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html). +For the Python API, see [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream) *** @@ -872,9 +1035,12 @@ Currently, the following output operations are defined: - + + This is useful for development and debugging. +
+ PS: called pprint() in Python) + @@ -915,17 +1081,41 @@ For this purpose, a developer may inadvertantly try creating a connection object the Spark driver, but try to use it in a Spark worker to save records in the RDDs. For example (in Scala), +
+
+ +{% highlight scala %} dstream.foreachRDD(rdd => { val connection = createNewConnection() // executed at the driver rdd.foreach(record => { connection.send(record) // executed at the worker }) }) +{% endhighlight %} + +
+
+ +{% highlight python %} +def sendRecord(rdd): + connection = createNewConnection() # executed at the driver + rdd.foreach(lambda record: connection.send(record)) + connection.close() + +dstream.foreachRDD(sendRecord) +{% endhighlight %} + +
+
- This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker. + This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker. - However, this can lead to another common mistake - creating a new connection for every record. For example, +
+
+ +{% highlight scala %} dstream.foreachRDD(rdd => { rdd.foreach(record => { val connection = createNewConnection() @@ -933,9 +1123,28 @@ For example (in Scala), connection.close() }) }) +{% endhighlight %} - Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use `rdd.foreachPartition` - create a single connection object and send all the records in a RDD partition using that connection. +
+
+ +{% highlight python %} +def sendRecord(record): + connection = createNewConnection() + connection.send(record) + connection.close() + +dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord)) +{% endhighlight %} +
+
+ + Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use `rdd.foreachPartition` - create a single connection object and send all the records in a RDD partition using that connection. + +
+
+{% highlight scala %} dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createNewConnection() @@ -943,13 +1152,31 @@ For example (in Scala), connection.close() }) }) +{% endhighlight %} +
+ +
+{% highlight python %} +def sendPartition(iter): + connection = createNewConnection() + for record in iter: + connection.send(record) + connection.close() + +dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition)) +{% endhighlight %} +
+
- This amortizes the connection creation overheads over many records. + This amortizes the connection creation overheads over many records. - Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads. - + +
+
+{% highlight scala %} dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections @@ -958,8 +1185,25 @@ For example (in Scala), ConnectionPool.returnConnection(connection) // return to the pool for future reuse }) }) +{% endhighlight %} +
- Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems. +
+{% highlight python %} +def sendPartition(iter): + # ConnectionPool is a static, lazily initialized pool of connections + connection = ConnectionPool.getConnection() + for record in iter: + connection.send(record) + # return to the pool for future reuse + ConnectionPool.returnConnection(connection) + +dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition)) +{% endhighlight %} +
+
+ +Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems. ##### Other points to remember: @@ -1376,6 +1620,44 @@ You can also explicitly create a `JavaStreamingContext` from the checkpoint data the computation by using `new JavaStreamingContext(checkpointDirectory)`. +
+ +This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows. + +{% highlight python %} +# Function to create and setup a new StreamingContext +def functionToCreateContext(): + sc = SparkContext(...) # new context + ssc = new StreamingContext(...) + lines = ssc.socketTextStream(...) # create DStreams + ... + ssc.checkpoint(checkpointDirectory) # set checkpoint directory + return ssc + +# Get StreamingContext from checkpoint data or create a new one +context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) + +# Do additional setup on context that needs to be done, +# irrespective of whether it is being started or restarted +context. ... + +# Start the context +context.start() +context.awaitTermination() +{% endhighlight %} + +If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. +If the directory does not exist (i.e., running for the first time), +then the function `functionToCreateContext` will be called to create a new +context and set up the DStreams. See the Python example +[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py). +This example appends the word counts of network data into a file. + +You can also explicitly create a `StreamingContext` from the checkpoint data and start the + computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`. + +
+ **Note**: If Spark Streaming and/or the Spark Streaming program is recompiled, @@ -1572,7 +1854,11 @@ package and renamed for better clarity. [TwitterUtils](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html), [ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and [MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html) + - Python docs + * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) + * [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream) * More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming) and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming) + and [Python] ({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming) * [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and [video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming. -- cgit v1.2.3
Output OperationMeaning
print() print() Prints first ten elements of every batch of data in a DStream on the driver. - This is useful for development and debugging.
saveAsObjectFiles(prefix, [suffix])