aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/JavaTestUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala/JavaTestUtils.scala')
-rw-r--r--streaming/src/test/scala/JavaTestUtils.scala42
1 files changed, 42 insertions, 0 deletions
diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala
new file mode 100644
index 0000000000..776b0e6bb6
--- /dev/null
+++ b/streaming/src/test/scala/JavaTestUtils.scala
@@ -0,0 +1,42 @@
+package spark.streaming
+
+import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import java.util.{List => JList}
+import spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import spark.streaming._
+import java.util.ArrayList
+import collection.JavaConversions._
+
+/** Exposes core test functionality in a Java-friendly way. */
+object JavaTestUtils extends TestSuiteBase {
+ def attachTestInputStream[T](ssc: JavaStreamingContext,
+ data: JList[JList[T]], numPartitions: Int) = {
+ val seqData = data.map(Seq(_:_*))
+
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
+ ssc.ssc.registerInputStream(dstream)
+ new JavaDStream[T](dstream)
+ }
+
+ def attachTestOutputStream[T](dstream: JavaDStream[T]) = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val ostream = new TestOutputStream(dstream.dstream,
+ new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
+ dstream.dstream.ssc.registerOutputStream(ostream)
+ }
+
+ def runStreams[V](
+ ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
+ implicit val cm: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
+ val out = new ArrayList[JList[V]]()
+ res.map(entry => out.append(new ArrayList[V](entry)))
+ out
+ }
+
+}
+