From 11d54941760f86706e28f7ace8ece664c9164ba6 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sat, 3 May 2014 12:31:31 -0700 Subject: SPARK-1663. Corrections for several compile errors in streaming code examples, and updates to follow API changes I gave the Streaming code examples, both Scala and Java, a test run today. I turned up a number of small errors, mostly compile errors in the Java examples. There were a few typos in the Scala too. I also took the liberty of adding things like imports, since in several cases they are not obvious. Feel free to push back on some changes. There's one thing I haven't quite addressed in the changes. `JavaPairDStream` uses the Java API version of `Function2` in almost all cases, as `JFunction2`. However it uses `scala.Function2` in: ``` def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) :JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) } ``` Is that a typo? Also, in Scala, I could not get this to compile: ``` val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) error: missing parameter type for expanded function ((x$1, x$2) => x$1.$plus(x$2)) ``` You can see my fix below but am I missing something? Otherwise I can say these all worked for me! Author: Sean Owen Closes #589 from srowen/SPARK-1663 and squashes the following commits: 65a906b [Sean Owen] Corrections for several compile errors in streaming code examples, and updates to follow API changes --- docs/streaming-programming-guide.md | 62 +++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 26 deletions(-) (limited to 'docs') diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 7ad06427ca..b22bb45828 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream will be processed in 1 second batches. {% highlight scala %} -// Create a StreamingContext with a SparkConf configuration -val ssc = new StreamingContext(sparkConf, Seconds(1)) +import org.apache.spark.api.java.function._ +import org.apache.spark.streaming._ +import org.apache.spark.streaming.api._ +// Create a StreamingContext with a local master +val ssc = new StreamingContext("local", "NetworkWordCount", Seconds(1)) {% endhighlight %} Using this context, we then create a new DStream by specifying the IP address and port of the data server. {% highlight scala %} -// Create a DStream that will connect to serverIP:serverPort -val lines = ssc.socketTextStream(serverIP, serverPort) +// Create a DStream that will connect to serverIP:serverPort, like localhost:9999 +val lines = ssc.socketTextStream("localhost", 9999) {% endhighlight %} This `lines` DStream represents the stream of data that will be received from the data @@ -103,6 +106,7 @@ each line will be split into multiple words and the stream of words is represent `words` DStream. Next, we want to count these words. {% highlight scala %} +import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) @@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify that any DStream would in 1 second batches. {% highlight java %} -// Create a StreamingContext with a SparkConf configuration -JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000)) +import org.apache.spark.api.java.function.*; +import org.apache.spark.streaming.*; +import org.apache.spark.streaming.api.java.*; +import scala.Tuple2; +// Create a StreamingContext with a local master +JavaStreamingContext jssc = new JavaStreamingContext("local", "JavaNetworkWordCount", new Duration(1000)) {% endhighlight %} Using this context, we then create a new DStream by specifying the IP address and port of the data server. {% highlight java %} -// Create a DStream that will connect to serverIP:serverPort -JavaDStream lines = jssc.socketTextStream(serverIP, serverPort); +// Create a DStream that will connect to serverIP:serverPort, like localhost:9999 +JavaDStream lines = jssc.socketTextStream("localhost", 9999); {% endhighlight %} This `lines` DStream represents the stream of data that will be received from the data @@ -159,7 +167,7 @@ space into words. JavaDStream words = lines.flatMap( new FlatMapFunction() { @Override public Iterable call(String x) { - return Lists.newArrayList(x.split(" ")); + return Arrays.asList(x.split(" ")); } }); {% endhighlight %} @@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are needed to deploy your if running in distributed mode, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster). Additionally, the underlying SparkContext can be accessed as -`streamingContext.sparkContext`. +`ssc.sparkContext`. The batch interval must be set based on the latency requirements of your application and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size) @@ -399,7 +407,7 @@ These operations are discussed in detail in later sections. ## Input Sources -We have already taken a look at the `streamingContext.socketTextStream(...)` in the [quick +We have already taken a look at the `ssc.socketTextStream(...)` in the [quick example](#a-quick-example) which creates a DStream from text data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides methods for creating DStreams from files and Akka actors as input sources. @@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as
{% highlight scala %} -streamingContext.fileStream(dataDirectory) +ssc.fileStream(dataDirectory) {% endhighlight %}
{% highlight java %} -javaStreamingContext.fileStream(dataDirectory); +jssc.fileStream(dataDirectory); {% endhighlight %}
@@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka as
{% highlight scala %} import org.apache.spark.streaming.kafka._ -KafkaUtils.createStream(streamingContext, kafkaParams, ...) +KafkaUtils.createStream(ssc, kafkaParams, ...) {% endhighlight %}
{% highlight java %} -import org.apache.spark.streaming.kafka.* -KafkaUtils.createStream(javaStreamingContext, kafkaParams, ...); +import org.apache.spark.streaming.kafka.*; +KafkaUtils.createStream(jssc, kafkaParams, ...); {% endhighlight %}
@@ -578,13 +586,14 @@ val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
{% highlight java %} +import com.google.common.base.Optional; Function2, Optional, Optional> updateFunction = new Function2, Optional, Optional>() { @Override public Optional call(List values, Optional state) { Integer newSum = ... // add the new values with the previous running count to get the new count - return Optional.of(newSum) + return Optional.of(newSum); } - } + }; {% endhighlight %} This is applied on a DStream containing words (say, the `pairs` DStream containing `(word, @@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and then filtering based o
{% highlight scala %} -val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information +val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information -val cleanedDStream = inputDStream.transform(rdd => { +val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }) @@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => {
{% highlight java %} +import org.apache.spark.streaming.api.java.*; // RDD containing spam information -JavaPairRDD spamInfoRDD = javaSparkContext.hadoopFile(...); +final JavaPairRDD spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); -JavaPairDStream cleanedDStream = inputDStream.transform( +JavaPairDStream cleanedDStream = wordCounts.transform( new Function, JavaPairRDD>() { @Override public JavaPairRDD call(JavaPairRDD rdd) throws Exception { - rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning + rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning ... } }); @@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`. {% highlight scala %} // Reduce last 30 seconds of data, every 10 seconds -val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) +val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) {% endhighlight %}
@@ -699,7 +709,7 @@ Function2 reduceFunc = new Function2 windowedWordCounts = pair.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000)); +JavaPairDStream windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000)); {% endhighlight %}
@@ -1087,7 +1097,7 @@ This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This i {% highlight java %} // Create a factory object that can create a and setup a new JavaStreamingContext JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { - JavaStreamingContextFactory create() { + @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); // new context JavaDStream lines = jssc.socketTextStream(...); // create DStreams ... -- cgit v1.2.3