diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-14 18:10:37 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-14 18:10:37 -0800 |
commit | 4b8402e900c803e64b8a4e2094fd845ccfc9df36 (patch) | |
tree | 1cf98b2f70c8b4eeb2c3eb38c839c67308fb0afa /examples/src/main/scala | |
parent | def8126d7788a8bd991ac6f9f9403de701a39dc5 (diff) | |
download | spark-4b8402e900c803e64b8a4e2094fd845ccfc9df36.tar.gz spark-4b8402e900c803e64b8a4e2094fd845ccfc9df36.tar.bz2 spark-4b8402e900c803e64b8a4e2094fd845ccfc9df36.zip |
Moved Java streaming examples to examples/src/main/java/spark/streaming/... and fixed logging in NetworkInputTracker to highlight errors when receiver deregisters/shuts down.
Diffstat (limited to 'examples/src/main/scala')
3 files changed, 0 insertions, 174 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java deleted file mode 100644 index cddce16e39..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java +++ /dev/null @@ -1,50 +0,0 @@ -package spark.streaming.examples; - -import spark.api.java.function.Function; -import spark.streaming.*; -import spark.streaming.api.java.*; -import spark.streaming.dstream.SparkFlumeEvent; - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. - * - * Usage: JavaFlumeEventCount <master> <host> <port> - * - * <master> is a Spark master URL - * <host> is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * <port> is the port the Flume receiver will listen on. - */ -public class JavaFlumeEventCount { - public static void main(String[] args) { - if (args.length != 3) { - System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>"); - System.exit(1); - } - - String master = args[0]; - String host = args[1]; - int port = Integer.parseInt(args[2]); - - Duration batchInterval = new Duration(2000); - - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval); - - JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port); - - flumeStream.count(); - - flumeStream.count().map(new Function<Long, String>() { - @Override - public String call(Long in) { - return "Received " + in + " flume events."; - } - }).print(); - - sc.start(); - } -} diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java deleted file mode 100644 index 4299febfd6..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java +++ /dev/null @@ -1,62 +0,0 @@ -package spark.streaming.examples; - -import com.google.common.collect.Lists; -import scala.Tuple2; -import spark.api.java.function.FlatMapFunction; -import spark.api.java.function.Function2; -import spark.api.java.function.PairFunction; -import spark.streaming.Duration; -import spark.streaming.api.java.JavaDStream; -import spark.streaming.api.java.JavaPairDStream; -import spark.streaming.api.java.JavaStreamingContext; - -/** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCount <master> <hostname> <port> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. - * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. - * - * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` - */ -public class JavaNetworkWordCount { - public static void main(String[] args) { - if (args.length < 2) { - System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + - "In local mode, <master> should be 'local[n]' with n > 1"); - System.exit(1); - } - - // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext( - args[0], "NetworkWordCount", new Duration(1000)); - - // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') - JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2])); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(x.split(" ")); - } - }); - JavaPairDStream<String, Integer> wordCounts = words.map( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) throws Exception { - return new Tuple2<String, Integer>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) throws Exception { - return i1 + i2; - } - }); - - wordCounts.print(); - ssc.start(); - - } -} diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java deleted file mode 100644 index 43c3cd4dfa..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java +++ /dev/null @@ -1,62 +0,0 @@ -package spark.streaming.examples; - -import com.google.common.collect.Lists; -import scala.Tuple2; -import spark.api.java.JavaRDD; -import spark.api.java.function.Function2; -import spark.api.java.function.PairFunction; -import spark.streaming.Duration; -import spark.streaming.api.java.JavaDStream; -import spark.streaming.api.java.JavaPairDStream; -import spark.streaming.api.java.JavaStreamingContext; - -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; - -public class JavaQueueStream { - public static void main(String[] args) throws InterruptedException { - if (args.length < 1) { - System.err.println("Usage: JavaQueueStream <master>"); - System.exit(1); - } - - // Create the context - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000)); - - // Create the queue through which RDDs can be pushed to - // a QueueInputDStream - Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>(); - - // Create and push some RDDs into the queue - List<Integer> list = Lists.newArrayList(); - for (int i = 0; i < 1000; i++) { - list.add(i); - } - - for (int i = 0; i < 30; i++) { - rddQueue.add(ssc.sc().parallelize(list)); - } - - - // Create the QueueInputDStream and use it do some processing - JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue); - JavaPairDStream<Integer, Integer> mappedStream = inputStream.map( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) throws Exception { - return new Tuple2<Integer, Integer>(i % 10, 1); - } - }); - JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) throws Exception { - return i1 + i2; - } - }); - - reducedStream.print(); - ssc.start(); - } -} |