aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-04 11:19:20 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit867a7455e27af9e8a6b95c87c882c0eebcaed0ad (patch)
tree241d5af64e4467e0620da03d77a7ffd276c6ec06 /streaming/src/test/scala
parentb607c9e9165a996289e4fb78bf7f2792121183d0 (diff)
downloadspark-867a7455e27af9e8a6b95c87c882c0eebcaed0ad.tar.gz
spark-867a7455e27af9e8a6b95c87c882c0eebcaed0ad.tar.bz2
spark-867a7455e27af9e8a6b95c87c882c0eebcaed0ad.zip
Adding some initial tests to streaming API.
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r--streaming/src/test/scala/JavaTestUtils.scala42
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java97
2 files changed, 139 insertions, 0 deletions
diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala
new file mode 100644
index 0000000000..776b0e6bb6
--- /dev/null
+++ b/streaming/src/test/scala/JavaTestUtils.scala
@@ -0,0 +1,42 @@
+package spark.streaming
+
+import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import java.util.{List => JList}
+import spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import spark.streaming._
+import java.util.ArrayList
+import collection.JavaConversions._
+
+/** Exposes core test functionality in a Java-friendly way. */
+object JavaTestUtils extends TestSuiteBase {
+ def attachTestInputStream[T](ssc: JavaStreamingContext,
+ data: JList[JList[T]], numPartitions: Int) = {
+ val seqData = data.map(Seq(_:_*))
+
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
+ ssc.ssc.registerInputStream(dstream)
+ new JavaDStream[T](dstream)
+ }
+
+ def attachTestOutputStream[T](dstream: JavaDStream[T]) = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val ostream = new TestOutputStream(dstream.dstream,
+ new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
+ dstream.dstream.ssc.registerOutputStream(ostream)
+ }
+
+ def runStreams[V](
+ ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
+ implicit val cm: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
+ val out = new ArrayList[JList[V]]()
+ res.map(entry => out.append(new ArrayList[V](entry)))
+ out
+ }
+
+}
+
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
new file mode 100644
index 0000000000..5327edfd5d
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -0,0 +1,97 @@
+package spark.streaming;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import spark.api.java.JavaRDD;
+import spark.api.java.function.Function;
+import spark.api.java.function.Function2;
+import spark.streaming.JavaTestUtils;
+import spark.streaming.api.java.JavaDStream;
+import spark.streaming.api.java.JavaStreamingContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+ private transient JavaStreamingContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaStreamingContext("local[2]", "test", new Time(1000));
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port");
+ }
+
+ @Test
+ public void testCount() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3,4), Arrays.asList(3,4,5), Arrays.asList(3));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream count = stream.count();
+ JavaTestUtils.attachTestOutputStream(count);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertTrue(result.equals(
+ Arrays.asList(Arrays.asList(4), Arrays.asList(3), Arrays.asList(1))));
+ }
+
+ @Test
+ public void testMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"), Arrays.asList("goodnight", "moon"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertTrue(result.equals(
+ Arrays.asList(Arrays.asList(5, 5), Arrays.asList(9, 4))));
+ }
+
+ public static void main(String[] args) {
+ JavaStreamingContext sc = new JavaStreamingContext("local[2]", "test", new Time(1000));
+
+ sc.networkTextStream("localhost", 12345).map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ }).reduce(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ }).foreach(new Function2<JavaRDD<Integer>, Time, Void>() {
+ @Override
+ public Void call(JavaRDD<Integer> integerJavaRDD, Time t) throws Exception {
+ System.out.println("Contents @ " + t.toFormattedString());
+ for (int i: integerJavaRDD.collect()) {
+ System.out.println(i + "\n");
+ }
+ return null;
+ }
+ });
+
+ sc.start();
+ }
+}