From b607c9e9165a996289e4fb78bf7f2792121183d0 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 3 Jan 2013 18:07:39 -0800 Subject: A very rough, early cut at some Java functionality for Streaming. --- .../main/scala/spark/streaming/JavaAPISuite.java | 64 +++++++++++++++ .../spark/streaming/api/java/JavaDStream.scala | 95 ++++++++++++++++++++++ .../streaming/api/java/JavaStreamingContext.scala | 29 +++++++ 3 files changed, 188 insertions(+) create mode 100644 streaming/src/main/scala/spark/streaming/JavaAPISuite.java create mode 100644 streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/JavaAPISuite.java b/streaming/src/main/scala/spark/streaming/JavaAPISuite.java new file mode 100644 index 0000000000..bcaaa4fa80 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/JavaAPISuite.java @@ -0,0 +1,64 @@ +package spark.streaming; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import spark.api.java.JavaRDD; +import spark.api.java.function.Function; +import spark.api.java.function.Function2; +import spark.streaming.api.java.JavaStreamingContext; + +import java.io.Serializable; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaStreamingContext sc; + + @Before + public void setUp() { + sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); + } + + @Test + public void simpleTest() { + sc.textFileStream("/var/log/syslog").print(); + sc.start(); + } + + public static void main(String[] args) { + JavaStreamingContext sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); + + sc.networkTextStream("localhost", 12345).map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }).reduce(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }).foreach(new Function2, Time, Void>() { + @Override + public Void call(JavaRDD integerJavaRDD, Time t) throws Exception { + System.out.println("Contents @ " + t.toFormattedString()); + for (int i: integerJavaRDD.collect()) { + System.out.println(i + "\n"); + } + return null; + } + }); + + sc.start(); + } +} diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala new file mode 100644 index 0000000000..e9391642f8 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -0,0 +1,95 @@ +package spark.streaming.api.java + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.api.java.JavaRDD +import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import java.util +import spark.RDD + +class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) { + def print() = dstream.print() + + // TODO move to type-specific implementations + def cache() : JavaDStream[T] = { + dstream.cache() + } + + def count() : JavaDStream[Int] = { + dstream.count() + } + + def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { + dstream.countByWindow(windowTime, slideTime) + } + + def compute(validTime: Time): JavaRDD[T] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaRDD(rdd) + case None => null + } + } + + def context(): StreamingContext = dstream.context() + + def window(windowTime: Time) = { + dstream.window(windowTime) + } + + def window(windowTime: Time, slideTime: Time): JavaDStream[T] = { + dstream.window(windowTime, slideTime) + } + + def tumble(batchTime: Time): JavaDStream[T] = { + dstream.tumble(batchTime) + } + + def map[R](f: JFunction[T, R]): JavaDStream[R] = { + new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) + } + + def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = { + dstream.filter((x => f(x).booleanValue())) + } + + def glom(): JavaDStream[JList[T]] = { + new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + } + + // TODO: Other map partitions + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) + } + + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) + + def reduceByWindow( + reduceFunc: JFunction2[T, T, T], + invReduceFunc: JFunction2[T, T, T], + windowTime: Time, + slideTime: Time): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + } + + def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { + new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) + } + + def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = { + dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) + } + + def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { + dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) + } +} + +object JavaDStream { + implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = + new JavaDStream[T](dstream) + +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala new file mode 100644 index 0000000000..46f8cffd0b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -0,0 +1,29 @@ +package spark.streaming.api.java + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import dstream.SparkFlumeEvent +import spark.storage.StorageLevel + +class JavaStreamingContext(val ssc: StreamingContext) { + def this(master: String, frameworkName: String, batchDuration: Time) = + this(new StreamingContext(master, frameworkName, batchDuration)) + + def textFileStream(directory: String): JavaDStream[String] = { + ssc.textFileStream(directory) + } + + def networkTextStream(hostname: String, port: Int): JavaDStream[String] = { + ssc.networkTextStream(hostname, port) + } + + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port, storageLevel) + } + + def start() = ssc.start() + def stop() = ssc.stop() + +} -- cgit v1.2.3