diff options
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 425 | ||||
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala | 36 |
2 files changed, 433 insertions, 28 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 076fb53fa1..daeb99f5b7 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -25,6 +25,7 @@ import com.google.common.io.Files; import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -186,6 +187,39 @@ public class JavaAPISuite implements Serializable { } @Test + public void testRepartitionMorePartitions() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStream repartitioned = stream.repartition(4); + JavaTestUtils.attachTestOutputStream(repartitioned); + List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for (List<List<Integer>> rdd : result) { + Assert.assertEquals(4, rdd.size()); + Assert.assertEquals( + 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()); + } + } + + @Test + public void testRepartitionFewerPartitions() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStream repartitioned = stream.repartition(2); + JavaTestUtils.attachTestOutputStream(repartitioned); + List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for (List<List<Integer>> rdd : result) { + Assert.assertEquals(2, rdd.size()); + Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size()); + } + } + + @Test public void testGlom() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), @@ -225,7 +259,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(mapped); - List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -294,8 +328,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9)); JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); - JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1, 2, 3)); + JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4, 5, 6)); JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList(); @@ -322,17 +356,19 @@ public class JavaAPISuite implements Serializable { Arrays.asList(9,10,11)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = - stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { - return in.map(new Function<Integer, Integer>() { - @Override - public Integer call(Integer i) throws Exception { - return i + 2; - } - }); - }}); + JavaDStream<Integer> transformed = stream.transform( + new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return in.map(new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + } + }); + JavaTestUtils.attachTestOutputStream(transformed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -340,6 +376,316 @@ public class JavaAPISuite implements Serializable { } @Test + public void testVariousTransform() { + // tests whether all variations of transform can be called from Java + + List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + + List<List<Tuple2<String, Integer>>> pairInputData = + Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); + + JavaDStream<Integer> transformed1 = stream.transform( + new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> transformed2 = stream.transform( + new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, Integer> transformed3 = stream.transform( + new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, Integer> transformed4 = stream.transform( + new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> pairTransformed1 = pairStream.transform( + new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> pairTransformed2 = pairStream.transform( + new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, String> pairTransformed3 = pairStream.transform( + new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, String> pairTransformed4 = pairStream.transform( + new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + return null; + } + } + ); + + } + + @Test + public void testTransformWith() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList( + new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList( + new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList( + new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList( + new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("dodgers", "giants")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("sharks", "ducks")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( + pairStream2, + new Function3< + JavaPairRDD<String, String>, + JavaPairRDD<String, String>, + Time, + JavaPairRDD<String, Tuple2<String, String>> + >() { + @Override + public JavaPairRDD<String, Tuple2<String, String>> call( + JavaPairRDD<String, String> rdd1, + JavaPairRDD<String, String> rdd2, + Time time + ) throws Exception { + return rdd1.join(rdd2); + } + } + ); + + JavaTestUtils.attachTestOutputStream(joined); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + + @Test + public void testVariousTransformWith() { + // tests whether all variations of transformWith can be called from Java + + List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1)); + List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x")); + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); + JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); + + List<List<Tuple2<String, Integer>>> pairInputData1 = + Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + List<List<Tuple2<Double, Character>>> pairInputData2 = + Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x'))); + JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); + JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); + + JavaDStream<Double> transformed1 = stream1.transformWith( + stream2, + new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> transformed2 = stream1.transformWith( + pairStream1, + new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> transformed3 = stream1.transformWith( + stream2, + new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> transformed4 = stream1.transformWith( + pairStream1, + new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> pairTransformed1 = pairStream1.transformWith( + stream2, + new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith( + pairStream1, + new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith( + stream2, + new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith( + pairStream2, + new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception { + return null; + } + } + ); + } + + @Test + public void testStreamingContextTransform(){ + List<List<Integer>> stream1input = Arrays.asList( + Arrays.asList(1), + Arrays.asList(2) + ); + + List<List<Integer>> stream2input = Arrays.asList( + Arrays.asList(3), + Arrays.asList(4) + ); + + List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( + Arrays.asList(new Tuple2<Integer, String>(1, "x")), + Arrays.asList(new Tuple2<Integer, String>(2, "y")) + ); + + List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))), + Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y"))) + ); + + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); + JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); + JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); + + List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); + + // This is just to test whether this transform to JavaStream compiles + JavaDStream<Long> transformed1 = ssc.transform( + listOfDStreams1, + new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() { + public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) { + assert(listOfRDDs.size() == 2); + return null; + } + } + ); + + List<JavaDStream<?>> listOfDStreams2 = + Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); + + JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform( + listOfDStreams2, + new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() { + public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) { + assert(listOfRDDs.size() == 3); + JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0); + JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1); + JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2); + JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); + PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i, i); + } + }; + return rdd1.union(rdd2).map(mapToTuple).join(prdd3); + } + } + ); + JavaTestUtils.attachTestOutputStream(transformed2); + List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + Assert.assertEquals(expected, result); + } + + @Test public void testFlatMap() { List<List<String>> inputData = Arrays.asList( Arrays.asList("go", "giants"), @@ -1101,7 +1447,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1144,7 +1490,38 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2); JavaTestUtils.attachTestOutputStream(joined); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testLeftOuterJoin() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks") )); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants") ), + Arrays.asList(new Tuple2<String, String>("new york", "islanders") ) + + ); + + List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L)); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2); + JavaDStream<Long> counted = joined.count(); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1222,14 +1599,20 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - JavaDStream<String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream<String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK()); HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zk.connect","localhost:12345"); - kafkaParams.put("groupid","consumer-group"); - JavaDStream<String> test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream<String, String> test3 = ssc.kafkaStream( + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topics, StorageLevel.MEMORY_AND_DISK()); } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index d5cdad4998..42ab9590d6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -35,9 +35,9 @@ trait JavaTestBase extends TestSuiteBase { * The stream will be derived from the supplied lists of Java objects. */ def attachTestInputStream[T]( - ssc: JavaStreamingContext, - data: JList[JList[T]], - numPartitions: Int) = { + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) implicit val cm: ClassTag[T] = @@ -52,12 +52,11 @@ trait JavaTestBase extends TestSuiteBase { * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val ostream = new TestOutputStream(dstream.dstream, - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + val ostream = new TestOutputStreamWithPartitions(dstream.dstream) dstream.dstream.ssc.registerOutputStream(ostream) } @@ -65,9 +64,11 @@ trait JavaTestBase extends TestSuiteBase { * Process all registered streams for a numBatches batches, failing if * numExpectedOutput RDD's are not generated. Generated RDD's are collected * and returned, represented as a list for each batch interval. + * + * Returns a list of items for each RDD. */ def runStreams[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) @@ -75,6 +76,27 @@ trait JavaTestBase extends TestSuiteBase { res.map(entry => out.append(new ArrayList[V](entry))) out } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ + def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int, + numExpectedOutput: Int): JList[JList[JList[V]]] = { + implicit val cm: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[JList[V]]]() + res.map{entry => + val lists = entry.map(new ArrayList[V](_)) + out.append(new ArrayList[JList[V]](lists)) + } + out + } } object JavaTestUtils extends JavaTestBase { |