From 2fe39a4468798b5b125c4c3436ee1180b3a7b470 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 9 Jan 2013 21:59:06 -0800 Subject: Some docs for the JavaTestUtils --- streaming/src/test/scala/JavaTestUtils.scala | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala index 9f3a80df8b..24ebc15e38 100644 --- a/streaming/src/test/scala/JavaTestUtils.scala +++ b/streaming/src/test/scala/JavaTestUtils.scala @@ -2,15 +2,22 @@ package spark.streaming import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import java.util.{List => JList} -import api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} +import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} import spark.streaming._ import java.util.ArrayList import collection.JavaConversions._ -/** Exposes core test functionality in a Java-friendly way. */ +/** Exposes streaming test functionality in a Java-friendly way. */ object JavaTestUtils extends TestSuiteBase { - def attachTestInputStream[T](ssc: JavaStreamingContext, - data: JList[JList[T]], numPartitions: Int) = { + + /** + * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. + * The stream will be derived from the supplied lists of Java objects. + **/ + def attachTestInputStream[T]( + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) implicit val cm: ClassManifest[T] = @@ -20,8 +27,12 @@ object JavaTestUtils extends TestSuiteBase { new JavaDStream[T](dstream) } - def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]] - (dstream: JavaDStreamLike[T, This]) = { + /** + * Attach a provided stream to it's associated StreamingContext as a + * [[spark.streaming.TestOutputStream]]. + **/ + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( + dstream: JavaDStreamLike[T, This]) = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] val ostream = new TestOutputStream(dstream.dstream, @@ -29,6 +40,11 @@ object JavaTestUtils extends TestSuiteBase { dstream.dstream.ssc.registerOutputStream(ostream) } + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + */ def runStreams[V]( ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassManifest[V] = -- cgit v1.2.3