diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java new file mode 100644 index 0000000000..43c3cd4dfa --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java @@ -0,0 +1,62 @@ +package spark.streaming.examples; + +import com.google.common.collect.Lists; +import scala.Tuple2; +import spark.api.java.JavaRDD; +import spark.api.java.function.Function2; +import spark.api.java.function.PairFunction; +import spark.streaming.Duration; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +public class JavaQueueStream { + public static void main(String[] args) throws InterruptedException { + if (args.length < 1) { + System.err.println("Usage: JavaQueueStream <master>"); + System.exit(1); + } + + // Create the context + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000)); + + // Create the queue through which RDDs can be pushed to + // a QueueInputDStream + Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>(); + + // Create and push some RDDs into the queue + List<Integer> list = Lists.newArrayList(); + for (int i = 0; i < 1000; i++) { + list.add(i); + } + + for (int i = 0; i < 30; i++) { + rddQueue.add(ssc.sc().parallelize(list)); + } + + + // Create the QueueInputDStream and use it do some processing + JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue); + JavaPairDStream<Integer, Integer> mappedStream = inputStream.map( + new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i % 10, 1); + } + }); + JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey( + new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + reducedStream.print(); + ssc.start(); + } +} |