aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-17 21:07:09 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-17 21:25:49 -0800
commite0165bf7141086e28f88cd68ab7bc6249061c924 (patch)
treeeed0fbf2db1b1b93395632e57985a4fd072c76f6 /examples
parent6fba7683c29b64a33a6daa28cc56bb5d20574314 (diff)
downloadspark-e0165bf7141086e28f88cd68ab7bc6249061c924.tar.gz
spark-e0165bf7141086e28f88cd68ab7bc6249061c924.tar.bz2
spark-e0165bf7141086e28f88cd68ab7bc6249061c924.zip
Adding queueStream and some slight refactoring
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java62
1 files changed, 62 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java
new file mode 100644
index 0000000000..43c3cd4dfa
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java
@@ -0,0 +1,62 @@
+package spark.streaming.examples;
+
+import com.google.common.collect.Lists;
+import scala.Tuple2;
+import spark.api.java.JavaRDD;
+import spark.api.java.function.Function2;
+import spark.api.java.function.PairFunction;
+import spark.streaming.Duration;
+import spark.streaming.api.java.JavaDStream;
+import spark.streaming.api.java.JavaPairDStream;
+import spark.streaming.api.java.JavaStreamingContext;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+public class JavaQueueStream {
+ public static void main(String[] args) throws InterruptedException {
+ if (args.length < 1) {
+ System.err.println("Usage: JavaQueueStream <master>");
+ System.exit(1);
+ }
+
+ // Create the context
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000));
+
+ // Create the queue through which RDDs can be pushed to
+ // a QueueInputDStream
+ Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
+
+ // Create and push some RDDs into the queue
+ List<Integer> list = Lists.newArrayList();
+ for (int i = 0; i < 1000; i++) {
+ list.add(i);
+ }
+
+ for (int i = 0; i < 30; i++) {
+ rddQueue.add(ssc.sc().parallelize(list));
+ }
+
+
+ // Create the QueueInputDStream and use it do some processing
+ JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
+ JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
+ new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i % 10, 1);
+ }
+ });
+ JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ });
+
+ reducedStream.print();
+ ssc.start();
+ }
+}