From 3461cd99b7b680be9c9dc263382b42f30c9edd7d Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 11 Jan 2013 12:05:04 -0800 Subject: Flume example and bug fix --- .../streaming/examples/JavaFlumeEventCount.java | 50 ++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java (limited to 'examples/src/main/scala') diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java new file mode 100644 index 0000000000..6592d9bc2e --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java @@ -0,0 +1,50 @@ +package spark.streaming.examples; + +import spark.api.java.function.Function; +import spark.streaming.*; +import spark.streaming.api.java.*; +import spark.streaming.dstream.SparkFlumeEvent; + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: FlumeEventCount + * + * is a Spark master URL + * is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * is the port the Flume receiver will listen on. + */ +public class JavaFlumeEventCount { + public static void main(String[] args) { + if (args.length != 3) { + System.err.println("Usage: JavaFlumeEventCount "); + System.exit(1); + } + + String master = args[0]; + String host = args[1]; + int port = Integer.parseInt(args[2]); + + Duration batchInterval = new Duration(2000); + + JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval); + + JavaDStream flumeStream = sc.flumeStream("localhost", port); + + flumeStream.count(); + + flumeStream.count().map(new Function() { + @Override + public String call(Integer in) { + return "Received " + in + " flume events."; + } + }).print(); + + sc.start(); + } +} -- cgit v1.2.3