diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-11 12:05:04 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | 3461cd99b7b680be9c9dc263382b42f30c9edd7d (patch) | |
tree | 17affcb7fcea5f47de09645012760e6a3977dbc5 | |
parent | 5bcb048167fe0b90f749910233342c09fff3fce7 (diff) | |
download | spark-3461cd99b7b680be9c9dc263382b42f30c9edd7d.tar.gz spark-3461cd99b7b680be9c9dc263382b42f30c9edd7d.tar.bz2 spark-3461cd99b7b680be9c9dc263382b42f30c9edd7d.zip |
Flume example and bug fix
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java | 50 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala | 9 |
2 files changed, 57 insertions, 2 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java new file mode 100644 index 0000000000..6592d9bc2e --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java @@ -0,0 +1,50 @@ +package spark.streaming.examples; + +import spark.api.java.function.Function; +import spark.streaming.*; +import spark.streaming.api.java.*; +import spark.streaming.dstream.SparkFlumeEvent; + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: FlumeEventCount <master> <host> <port> + * + * <master> is a Spark master URL + * <host> is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * <port> is the port the Flume receiver will listen on. + */ +public class JavaFlumeEventCount { + public static void main(String[] args) { + if (args.length != 3) { + System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>"); + System.exit(1); + } + + String master = args[0]; + String host = args[1]; + int port = Integer.parseInt(args[2]); + + Duration batchInterval = new Duration(2000); + + JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval); + + JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port); + + flumeStream.count(); + + flumeStream.count().map(new Function<Integer, String>() { + @Override + public String call(Integer in) { + return "Received " + in + " flume events."; + } + }).print(); + + sc.start(); + } +} diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index cb58c1351d..91bcca9afa 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -1,6 +1,7 @@ package spark.streaming.api.java import java.util.{List => JList} +import java.lang.{Integer => JInt} import scala.collection.JavaConversions._ @@ -16,6 +17,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable def dstream: DStream[T] + implicit def scalaIntToJavaInteger(in: DStream[Int]): JavaDStream[JInt] = { + in.map(new JInt(_)) + } + /** * Prints the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. @@ -26,14 +31,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * Returns a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): JavaDStream[Int] = dstream.count() + def count(): JavaDStream[JInt] = dstream.count() /** * Returns a new DStream in which each RDD has a single element generated by counting the number * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ - def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[Int] = { + def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JInt] = { dstream.countByWindow(windowDuration, slideDuration) } |