aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-03 18:07:39 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commitb607c9e9165a996289e4fb78bf7f2792121183d0 (patch)
tree64f67d7d75651ee28527e3fa172bef6a34864c20 /streaming
parent82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1 (diff)
downloadspark-b607c9e9165a996289e4fb78bf7f2792121183d0.tar.gz
spark-b607c9e9165a996289e4fb78bf7f2792121183d0.tar.bz2
spark-b607c9e9165a996289e4fb78bf7f2792121183d0.zip
A very rough, early cut at some Java functionality for Streaming.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/JavaAPISuite.java64
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala95
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala29
3 files changed, 188 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/JavaAPISuite.java b/streaming/src/main/scala/spark/streaming/JavaAPISuite.java
new file mode 100644
index 0000000000..bcaaa4fa80
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/JavaAPISuite.java
@@ -0,0 +1,64 @@
+package spark.streaming;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import spark.api.java.JavaRDD;
+import spark.api.java.function.Function;
+import spark.api.java.function.Function2;
+import spark.streaming.api.java.JavaStreamingContext;
+
+import java.io.Serializable;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+ private transient JavaStreamingContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaStreamingContext("local[2]", "test", new Time(1000));
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port");
+ }
+
+ @Test
+ public void simpleTest() {
+ sc.textFileStream("/var/log/syslog").print();
+ sc.start();
+ }
+
+ public static void main(String[] args) {
+ JavaStreamingContext sc = new JavaStreamingContext("local[2]", "test", new Time(1000));
+
+ sc.networkTextStream("localhost", 12345).map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ }).reduce(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ }).foreach(new Function2<JavaRDD<Integer>, Time, Void>() {
+ @Override
+ public Void call(JavaRDD<Integer> integerJavaRDD, Time t) throws Exception {
+ System.out.println("Contents @ " + t.toFormattedString());
+ for (int i: integerJavaRDD.collect()) {
+ System.out.println(i + "\n");
+ }
+ return null;
+ }
+ });
+
+ sc.start();
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
new file mode 100644
index 0000000000..e9391642f8
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -0,0 +1,95 @@
+package spark.streaming.api.java
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+import spark.streaming._
+import spark.api.java.JavaRDD
+import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
+import java.util
+import spark.RDD
+
+class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) {
+ def print() = dstream.print()
+
+ // TODO move to type-specific implementations
+ def cache() : JavaDStream[T] = {
+ dstream.cache()
+ }
+
+ def count() : JavaDStream[Int] = {
+ dstream.count()
+ }
+
+ def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = {
+ dstream.countByWindow(windowTime, slideTime)
+ }
+
+ def compute(validTime: Time): JavaRDD[T] = {
+ dstream.compute(validTime) match {
+ case Some(rdd) => new JavaRDD(rdd)
+ case None => null
+ }
+ }
+
+ def context(): StreamingContext = dstream.context()
+
+ def window(windowTime: Time) = {
+ dstream.window(windowTime)
+ }
+
+ def window(windowTime: Time, slideTime: Time): JavaDStream[T] = {
+ dstream.window(windowTime, slideTime)
+ }
+
+ def tumble(batchTime: Time): JavaDStream[T] = {
+ dstream.tumble(batchTime)
+ }
+
+ def map[R](f: JFunction[T, R]): JavaDStream[R] = {
+ new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+ }
+
+ def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = {
+ dstream.filter((x => f(x).booleanValue()))
+ }
+
+ def glom(): JavaDStream[JList[T]] = {
+ new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+ }
+
+ // TODO: Other map partitions
+ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+ }
+
+ def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
+
+ def reduceByWindow(
+ reduceFunc: JFunction2[T, T, T],
+ invReduceFunc: JFunction2[T, T, T],
+ windowTime: Time,
+ slideTime: Time): JavaDStream[T] = {
+ dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
+ }
+
+ def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = {
+ new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq)
+ }
+
+ def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = {
+ dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
+ }
+
+ def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = {
+ dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
+ }
+}
+
+object JavaDStream {
+ implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] =
+ new JavaDStream[T](dstream)
+
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
new file mode 100644
index 0000000000..46f8cffd0b
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -0,0 +1,29 @@
+package spark.streaming.api.java
+
+import scala.collection.JavaConversions._
+
+import spark.streaming._
+import dstream.SparkFlumeEvent
+import spark.storage.StorageLevel
+
+class JavaStreamingContext(val ssc: StreamingContext) {
+ def this(master: String, frameworkName: String, batchDuration: Time) =
+ this(new StreamingContext(master, frameworkName, batchDuration))
+
+ def textFileStream(directory: String): JavaDStream[String] = {
+ ssc.textFileStream(directory)
+ }
+
+ def networkTextStream(hostname: String, port: Int): JavaDStream[String] = {
+ ssc.networkTextStream(hostname, port)
+ }
+
+ def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
+ JavaDStream[SparkFlumeEvent] = {
+ ssc.flumeStream(hostname, port, storageLevel)
+ }
+
+ def start() = ssc.start()
+ def stop() = ssc.stop()
+
+}