From 867a7455e27af9e8a6b95c87c882c0eebcaed0ad Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 4 Jan 2013 11:19:20 -0800 Subject: Adding some initial tests to streaming API. --- .../main/scala/spark/streaming/JavaAPISuite.java | 64 -------------- .../streaming/api/java/JavaStreamingContext.scala | 1 + streaming/src/test/scala/JavaTestUtils.scala | 42 ++++++++++ .../test/scala/spark/streaming/JavaAPISuite.java | 97 ++++++++++++++++++++++ 4 files changed, 140 insertions(+), 64 deletions(-) delete mode 100644 streaming/src/main/scala/spark/streaming/JavaAPISuite.java create mode 100644 streaming/src/test/scala/JavaTestUtils.scala create mode 100644 streaming/src/test/scala/spark/streaming/JavaAPISuite.java (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/JavaAPISuite.java b/streaming/src/main/scala/spark/streaming/JavaAPISuite.java deleted file mode 100644 index bcaaa4fa80..0000000000 --- a/streaming/src/main/scala/spark/streaming/JavaAPISuite.java +++ /dev/null @@ -1,64 +0,0 @@ -package spark.streaming; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import spark.api.java.JavaRDD; -import spark.api.java.function.Function; -import spark.api.java.function.Function2; -import spark.streaming.api.java.JavaStreamingContext; - -import java.io.Serializable; - -// The test suite itself is Serializable so that anonymous Function implementations can be -// serialized, as an alternative to converting these anonymous classes to static inner classes; -// see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext sc; - - @Before - public void setUp() { - sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port"); - } - - @Test - public void simpleTest() { - sc.textFileStream("/var/log/syslog").print(); - sc.start(); - } - - public static void main(String[] args) { - JavaStreamingContext sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); - - sc.networkTextStream("localhost", 12345).map(new Function() { - @Override - public Integer call(String s) throws Exception { - return s.length(); - } - }).reduce(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) throws Exception { - return i1 + i2; - } - }).foreach(new Function2, Time, Void>() { - @Override - public Void call(JavaRDD integerJavaRDD, Time t) throws Exception { - System.out.println("Contents @ " + t.toFormattedString()); - for (int i: integerJavaRDD.collect()) { - System.out.println(i + "\n"); - } - return null; - } - }); - - sc.start(); - } -} diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 46f8cffd0b..19cd032fc1 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -1,6 +1,7 @@ package spark.streaming.api.java import scala.collection.JavaConversions._ +import java.util.{List => JList} import spark.streaming._ import dstream.SparkFlumeEvent diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala new file mode 100644 index 0000000000..776b0e6bb6 --- /dev/null +++ b/streaming/src/test/scala/JavaTestUtils.scala @@ -0,0 +1,42 @@ +package spark.streaming + +import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import java.util.{List => JList} +import spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import spark.streaming._ +import java.util.ArrayList +import collection.JavaConversions._ + +/** Exposes core test functionality in a Java-friendly way. */ +object JavaTestUtils extends TestSuiteBase { + def attachTestInputStream[T](ssc: JavaStreamingContext, + data: JList[JList[T]], numPartitions: Int) = { + val seqData = data.map(Seq(_:_*)) + + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) + ssc.ssc.registerInputStream(dstream) + new JavaDStream[T](dstream) + } + + def attachTestOutputStream[T](dstream: JavaDStream[T]) = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val ostream = new TestOutputStream(dstream.dstream, + new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + dstream.dstream.ssc.registerOutputStream(ostream) + } + + def runStreams[V]( + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + implicit val cm: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[V]]() + res.map(entry => out.append(new ArrayList[V](entry))) + out + } + +} + diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java new file mode 100644 index 0000000000..5327edfd5d --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -0,0 +1,97 @@ +package spark.streaming; + +import org.junit.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import spark.api.java.JavaRDD; +import spark.api.java.function.Function; +import spark.api.java.function.Function2; +import spark.streaming.JavaTestUtils; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaStreamingContext; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaStreamingContext sc; + + @Before + public void setUp() { + sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); + } + + @Test + public void testCount() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4), Arrays.asList(3,4,5), Arrays.asList(3)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream count = stream.count(); + JavaTestUtils.attachTestOutputStream(count); + List> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertTrue(result.equals( + Arrays.asList(Arrays.asList(4), Arrays.asList(3), Arrays.asList(1)))); + } + + @Test + public void testMap() { + List> inputData = Arrays.asList( + Arrays.asList("hello", "world"), Arrays.asList("goodnight", "moon")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaTestUtils.attachTestOutputStream(letterCount); + List> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertTrue(result.equals( + Arrays.asList(Arrays.asList(5, 5), Arrays.asList(9, 4)))); + } + + public static void main(String[] args) { + JavaStreamingContext sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); + + sc.networkTextStream("localhost", 12345).map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }).reduce(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }).foreach(new Function2, Time, Void>() { + @Override + public Void call(JavaRDD integerJavaRDD, Time t) throws Exception { + System.out.println("Contents @ " + t.toFormattedString()); + for (int i: integerJavaRDD.collect()) { + System.out.println(i + "\n"); + } + return null; + } + }); + + sc.start(); + } +} -- cgit v1.2.3