aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-05-03 12:31:31 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-03 12:31:31 -0700
commit11d54941760f86706e28f7ace8ece664c9164ba6 (patch)
tree000924e1280bdb5ce92a6bbe37e5bca6ac4dd9ee /docs/streaming-programming-guide.md
parent3d0a02dff3011e8894d98d903cd086bc95e56807 (diff)
downloadspark-11d54941760f86706e28f7ace8ece664c9164ba6.tar.gz
spark-11d54941760f86706e28f7ace8ece664c9164ba6.tar.bz2
spark-11d54941760f86706e28f7ace8ece664c9164ba6.zip
SPARK-1663. Corrections for several compile errors in streaming code examples, and updates to follow API changes
I gave the Streaming code examples, both Scala and Java, a test run today. I turned up a number of small errors, mostly compile errors in the Java examples. There were a few typos in the Scala too. I also took the liberty of adding things like imports, since in several cases they are not obvious. Feel free to push back on some changes. There's one thing I haven't quite addressed in the changes. `JavaPairDStream` uses the Java API version of `Function2` in almost all cases, as `JFunction2`. However it uses `scala.Function2` in: ``` def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) :JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) } ``` Is that a typo? Also, in Scala, I could not get this to compile: ``` val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) error: missing parameter type for expanded function ((x$1, x$2) => x$1.$plus(x$2)) ``` You can see my fix below but am I missing something? Otherwise I can say these all worked for me! Author: Sean Owen <sowen@cloudera.com> Closes #589 from srowen/SPARK-1663 and squashes the following commits: 65a906b [Sean Owen] Corrections for several compile errors in streaming code examples, and updates to follow API changes
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md62
1 files changed, 36 insertions, 26 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 7ad06427ca..b22bb45828 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream will be processed
in 1 second batches.
{% highlight scala %}
-// Create a StreamingContext with a SparkConf configuration
-val ssc = new StreamingContext(sparkConf, Seconds(1))
+import org.apache.spark.api.java.function._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.api._
+// Create a StreamingContext with a local master
+val ssc = new StreamingContext("local", "NetworkWordCount", Seconds(1))
{% endhighlight %}
Using this context, we then create a new DStream
by specifying the IP address and port of the data server.
{% highlight scala %}
-// Create a DStream that will connect to serverIP:serverPort
-val lines = ssc.socketTextStream(serverIP, serverPort)
+// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
+val lines = ssc.socketTextStream("localhost", 9999)
{% endhighlight %}
This `lines` DStream represents the stream of data that will be received from the data
@@ -103,6 +106,7 @@ each line will be split into multiple words and the stream of words is represent
`words` DStream. Next, we want to count these words.
{% highlight scala %}
+import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
@@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify that any DStream would
in 1 second batches.
{% highlight java %}
-// Create a StreamingContext with a SparkConf configuration
-JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000))
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import scala.Tuple2;
+// Create a StreamingContext with a local master
+JavaStreamingContext jssc = new JavaStreamingContext("local", "JavaNetworkWordCount", new Duration(1000))
{% endhighlight %}
Using this context, we then create a new DStream
by specifying the IP address and port of the data server.
{% highlight java %}
-// Create a DStream that will connect to serverIP:serverPort
-JavaDStream<String> lines = jssc.socketTextStream(serverIP, serverPort);
+// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
+JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
{% endhighlight %}
This `lines` DStream represents the stream of data that will be received from the data
@@ -159,7 +167,7 @@ space into words.
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
+ return Arrays.asList(x.split(" "));
}
});
{% endhighlight %}
@@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are needed to deploy your
if running in distributed mode, as described in the
[Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
Additionally, the underlying SparkContext can be accessed as
-`streamingContext.sparkContext`.
+`ssc.sparkContext`.
The batch interval must be set based on the latency requirements of your application
and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
@@ -399,7 +407,7 @@ These operations are discussed in detail in later sections.
## Input Sources
-We have already taken a look at the `streamingContext.socketTextStream(...)` in the [quick
+We have already taken a look at the `ssc.socketTextStream(...)` in the [quick
example](#a-quick-example) which creates a DStream from text
data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides
methods for creating DStreams from files and Akka actors as input sources.
@@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as
<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
-streamingContext.fileStream(dataDirectory)
+ssc.fileStream(dataDirectory)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
-javaStreamingContext.fileStream(dataDirectory);
+jssc.fileStream(dataDirectory);
{% endhighlight %}
</div>
</div>
@@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka as
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.streaming.kafka._
-KafkaUtils.createStream(streamingContext, kafkaParams, ...)
+KafkaUtils.createStream(ssc, kafkaParams, ...)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
-import org.apache.spark.streaming.kafka.*
-KafkaUtils.createStream(javaStreamingContext, kafkaParams, ...);
+import org.apache.spark.streaming.kafka.*;
+KafkaUtils.createStream(jssc, kafkaParams, ...);
{% endhighlight %}
</div>
</div>
@@ -578,13 +586,14 @@ val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
<div data-lang="java" markdown="1">
{% highlight java %}
+import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = ... // add the new values with the previous running count to get the new count
- return Optional.of(newSum)
+ return Optional.of(newSum);
}
- }
+ };
{% endhighlight %}
This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
@@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and then filtering based o
<div data-lang="scala" markdown="1">
{% highlight scala %}
-val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information
+val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
-val cleanedDStream = inputDStream.transform(rdd => {
+val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
@@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => {
<div data-lang="java" markdown="1">
{% highlight java %}
+import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
-JavaPairRDD<String, Double> spamInfoRDD = javaSparkContext.hadoopFile(...);
+final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
-JavaPairDStream<String, Integer> cleanedDStream = inputDStream.transform(
+JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
- rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
+ rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
}
});
@@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`.
{% highlight scala %}
// Reduce last 30 seconds of data, every 10 seconds
-val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
+val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
{% endhighlight %}
</div>
@@ -699,7 +709,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer
};
// Reduce last 30 seconds of data, every 10 seconds
-JavaPairDStream<String, Integer> windowedWordCounts = pair.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
+JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
{% endhighlight %}
</div>
@@ -1087,7 +1097,7 @@ This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This i
{% highlight java %}
// Create a factory object that can create a and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
- JavaStreamingContextFactory create() {
+ @Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
...