diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-20 01:13:56 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-20 01:13:56 -0800 |
commit | 4f8fe58b2579b6040bb9a7d1fcf9adc19843a97b (patch) | |
tree | ef25dc458f27f1a0ecc628d024ee04019bdf448f /examples/src | |
parent | 214345ceace634ec9cc83c4c85b233b699e0d219 (diff) | |
parent | 11bbe231408a3223d110c89519a70184d58408af (diff) | |
download | spark-4f8fe58b2579b6040bb9a7d1fcf9adc19843a97b.tar.gz spark-4f8fe58b2579b6040bb9a7d1fcf9adc19843a97b.tar.bz2 spark-4f8fe58b2579b6040bb9a7d1fcf9adc19843a97b.zip |
Merge branch 'mesos-streaming' into streaming
Conflicts:
core/src/main/scala/spark/api/java/JavaRDDLike.scala
core/src/main/scala/spark/api/java/JavaSparkContext.scala
core/src/test/scala/spark/JavaAPISuite.java
Diffstat (limited to 'examples/src')
4 files changed, 175 insertions, 1 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java new file mode 100644 index 0000000000..cddce16e39 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java @@ -0,0 +1,50 @@ +package spark.streaming.examples; + +import spark.api.java.function.Function; +import spark.streaming.*; +import spark.streaming.api.java.*; +import spark.streaming.dstream.SparkFlumeEvent; + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: JavaFlumeEventCount <master> <host> <port> + * + * <master> is a Spark master URL + * <host> is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * <port> is the port the Flume receiver will listen on. + */ +public class JavaFlumeEventCount { + public static void main(String[] args) { + if (args.length != 3) { + System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>"); + System.exit(1); + } + + String master = args[0]; + String host = args[1]; + int port = Integer.parseInt(args[2]); + + Duration batchInterval = new Duration(2000); + + JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval); + + JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port); + + flumeStream.count(); + + flumeStream.count().map(new Function<Long, String>() { + @Override + public String call(Long in) { + return "Received " + in + " flume events."; + } + }).print(); + + sc.start(); + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java new file mode 100644 index 0000000000..4299febfd6 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java @@ -0,0 +1,62 @@ +package spark.streaming.examples; + +import com.google.common.collect.Lists; +import scala.Tuple2; +import spark.api.java.function.FlatMapFunction; +import spark.api.java.function.Function2; +import spark.api.java.function.PairFunction; +import spark.streaming.Duration; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCount <master> <hostname> <port> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` + */ +public class JavaNetworkWordCount { + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1"); + System.exit(1); + } + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext( + args[0], "NetworkWordCount", new Duration(1000)); + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2])); + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split(" ")); + } + }); + JavaPairDStream<String, Integer> wordCounts = words.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java new file mode 100644 index 0000000000..43c3cd4dfa --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java @@ -0,0 +1,62 @@ +package spark.streaming.examples; + +import com.google.common.collect.Lists; +import scala.Tuple2; +import spark.api.java.JavaRDD; +import spark.api.java.function.Function2; +import spark.api.java.function.PairFunction; +import spark.streaming.Duration; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +public class JavaQueueStream { + public static void main(String[] args) throws InterruptedException { + if (args.length < 1) { + System.err.println("Usage: JavaQueueStream <master>"); + System.exit(1); + } + + // Create the context + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000)); + + // Create the queue through which RDDs can be pushed to + // a QueueInputDStream + Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>(); + + // Create and push some RDDs into the queue + List<Integer> list = Lists.newArrayList(); + for (int i = 0; i < 1000; i++) { + list.add(i); + } + + for (int i = 0; i < 30; i++) { + rddQueue.add(ssc.sc().parallelize(list)); + } + + + // Create the QueueInputDStream and use it do some processing + JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue); + JavaPairDStream<Integer, Integer> mappedStream = inputStream.map( + new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i % 10, 1); + } + }); + JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey( + new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + reducedStream.print(); + ssc.start(); + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index 43c01d5db2..32f7d57bea 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -22,7 +22,7 @@ object NetworkWordCount { System.exit(1) } - // Create the context and set the batch size + // Create the context with a 1 second batch size val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the |