aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-11 12:05:04 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit3461cd99b7b680be9c9dc263382b42f30c9edd7d (patch)
tree17affcb7fcea5f47de09645012760e6a3977dbc5
parent5bcb048167fe0b90f749910233342c09fff3fce7 (diff)
downloadspark-3461cd99b7b680be9c9dc263382b42f30c9edd7d.tar.gz
spark-3461cd99b7b680be9c9dc263382b42f30c9edd7d.tar.bz2
spark-3461cd99b7b680be9c9dc263382b42f30c9edd7d.zip
Flume example and bug fix
-rw-r--r--examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java50
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala9
2 files changed, 57 insertions, 2 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java
new file mode 100644
index 0000000000..6592d9bc2e
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java
@@ -0,0 +1,50 @@
+package spark.streaming.examples;
+
+import spark.api.java.function.Function;
+import spark.streaming.*;
+import spark.streaming.api.java.*;
+import spark.streaming.dstream.SparkFlumeEvent;
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: FlumeEventCount <master> <host> <port>
+ *
+ * <master> is a Spark master URL
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ */
+public class JavaFlumeEventCount {
+ public static void main(String[] args) {
+ if (args.length != 3) {
+ System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
+ System.exit(1);
+ }
+
+ String master = args[0];
+ String host = args[1];
+ int port = Integer.parseInt(args[2]);
+
+ Duration batchInterval = new Duration(2000);
+
+ JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval);
+
+ JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
+
+ flumeStream.count();
+
+ flumeStream.count().map(new Function<Integer, String>() {
+ @Override
+ public String call(Integer in) {
+ return "Received " + in + " flume events.";
+ }
+ }).print();
+
+ sc.start();
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index cb58c1351d..91bcca9afa 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -1,6 +1,7 @@
package spark.streaming.api.java
import java.util.{List => JList}
+import java.lang.{Integer => JInt}
import scala.collection.JavaConversions._
@@ -16,6 +17,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def dstream: DStream[T]
+ implicit def scalaIntToJavaInteger(in: DStream[Int]): JavaDStream[JInt] = {
+ in.map(new JInt(_))
+ }
+
/**
* Prints the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
@@ -26,14 +31,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Returns a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): JavaDStream[Int] = dstream.count()
+ def count(): JavaDStream[JInt] = dstream.count()
/**
* Returns a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[Int] = {
+ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JInt] = {
dstream.countByWindow(windowDuration, slideDuration)
}