aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/JavaTestUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/java/JavaTestUtils.scala')
-rw-r--r--streaming/src/test/java/JavaTestUtils.scala65
1 files changed, 65 insertions, 0 deletions
diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/JavaTestUtils.scala
new file mode 100644
index 0000000000..56349837e5
--- /dev/null
+++ b/streaming/src/test/java/JavaTestUtils.scala
@@ -0,0 +1,65 @@
+package spark.streaming
+
+import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import java.util.{List => JList}
+import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
+import spark.streaming._
+import java.util.ArrayList
+import collection.JavaConversions._
+
+/** Exposes streaming test functionality in a Java-friendly way. */
+trait JavaTestBase extends TestSuiteBase {
+
+ /**
+ * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context.
+ * The stream will be derived from the supplied lists of Java objects.
+ **/
+ def attachTestInputStream[T](
+ ssc: JavaStreamingContext,
+ data: JList[JList[T]],
+ numPartitions: Int) = {
+ val seqData = data.map(Seq(_:_*))
+
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
+ ssc.ssc.registerInputStream(dstream)
+ new JavaDStream[T](dstream)
+ }
+
+ /**
+ * Attach a provided stream to it's associated StreamingContext as a
+ * [[spark.streaming.TestOutputStream]].
+ **/
+ def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
+ dstream: JavaDStreamLike[T, This]) = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val ostream = new TestOutputStream(dstream.dstream,
+ new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
+ dstream.dstream.ssc.registerOutputStream(ostream)
+ }
+
+ /**
+ * Process all registered streams for a numBatches batches, failing if
+ * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+ * and returned, represented as a list for each batch interval.
+ */
+ def runStreams[V](
+ ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
+ implicit val cm: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
+ val out = new ArrayList[JList[V]]()
+ res.map(entry => out.append(new ArrayList[V](entry)))
+ out
+ }
+}
+
+object JavaTestUtils extends JavaTestBase {
+
+}
+
+object JavaCheckpointTestUtils extends JavaTestBase {
+ override def actuallyWait = true
+} \ No newline at end of file