aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-14 18:10:37 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-14 18:10:37 -0800
commit4b8402e900c803e64b8a4e2094fd845ccfc9df36 (patch)
tree1cf98b2f70c8b4eeb2c3eb38c839c67308fb0afa /examples/src/main/scala
parentdef8126d7788a8bd991ac6f9f9403de701a39dc5 (diff)
downloadspark-4b8402e900c803e64b8a4e2094fd845ccfc9df36.tar.gz
spark-4b8402e900c803e64b8a4e2094fd845ccfc9df36.tar.bz2
spark-4b8402e900c803e64b8a4e2094fd845ccfc9df36.zip
Moved Java streaming examples to examples/src/main/java/spark/streaming/... and fixed logging in NetworkInputTracker to highlight errors when receiver deregisters/shuts down.
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java50
-rw-r--r--examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java62
-rw-r--r--examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java62
3 files changed, 0 insertions, 174 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java
deleted file mode 100644
index cddce16e39..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package spark.streaming.examples;
-
-import spark.api.java.function.Function;
-import spark.streaming.*;
-import spark.streaming.api.java.*;
-import spark.streaming.dstream.SparkFlumeEvent;
-
-/**
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: JavaFlumeEventCount <master> <host> <port>
- *
- * <master> is a Spark master URL
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- */
-public class JavaFlumeEventCount {
- public static void main(String[] args) {
- if (args.length != 3) {
- System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
- System.exit(1);
- }
-
- String master = args[0];
- String host = args[1];
- int port = Integer.parseInt(args[2]);
-
- Duration batchInterval = new Duration(2000);
-
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval);
-
- JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
-
- flumeStream.count();
-
- flumeStream.count().map(new Function<Long, String>() {
- @Override
- public String call(Long in) {
- return "Received " + in + " flume events.";
- }
- }).print();
-
- sc.start();
- }
-}
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java
deleted file mode 100644
index 4299febfd6..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package spark.streaming.examples;
-
-import com.google.common.collect.Lists;
-import scala.Tuple2;
-import spark.api.java.function.FlatMapFunction;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
-import spark.streaming.Duration;
-import spark.streaming.api.java.JavaDStream;
-import spark.streaming.api.java.JavaPairDStream;
-import spark.streaming.api.java.JavaStreamingContext;
-
-/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: NetworkWordCount <master> <hostname> <port>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- *
- * To run this on your local machine, you need to first run a Netcat server
- * `$ nc -lk 9999`
- * and then run the example
- * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
- */
-public class JavaNetworkWordCount {
- public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1");
- System.exit(1);
- }
-
- // Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(
- args[0], "NetworkWordCount", new Duration(1000));
-
- // Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
- JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2]));
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.map(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) throws Exception {
- return i1 + i2;
- }
- });
-
- wordCounts.print();
- ssc.start();
-
- }
-}
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java
deleted file mode 100644
index 43c3cd4dfa..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package spark.streaming.examples;
-
-import com.google.common.collect.Lists;
-import scala.Tuple2;
-import spark.api.java.JavaRDD;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
-import spark.streaming.Duration;
-import spark.streaming.api.java.JavaDStream;
-import spark.streaming.api.java.JavaPairDStream;
-import spark.streaming.api.java.JavaStreamingContext;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-public class JavaQueueStream {
- public static void main(String[] args) throws InterruptedException {
- if (args.length < 1) {
- System.err.println("Usage: JavaQueueStream <master>");
- System.exit(1);
- }
-
- // Create the context
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000));
-
- // Create the queue through which RDDs can be pushed to
- // a QueueInputDStream
- Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
-
- // Create and push some RDDs into the queue
- List<Integer> list = Lists.newArrayList();
- for (int i = 0; i < 1000; i++) {
- list.add(i);
- }
-
- for (int i = 0; i < 30; i++) {
- rddQueue.add(ssc.sc().parallelize(list));
- }
-
-
- // Create the QueueInputDStream and use it do some processing
- JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
- JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
- return new Tuple2<Integer, Integer>(i % 10, 1);
- }
- });
- JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) throws Exception {
- return i1 + i2;
- }
- });
-
- reducedStream.print();
- ssc.start();
- }
-}