From 31e92b72e31910be1694c348ab5de8b14f2df44b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 21:14:56 -0700 Subject: Adding Java versions and associated tests --- .../spark/streaming/api/java/JavaDStream.scala | 6 ++++ .../spark/streaming/api/java/JavaPairDStream.scala | 6 ++++ .../org/apache/spark/streaming/JavaAPISuite.java | 33 ++++++++++++++++++++++ .../org/apache/spark/streaming/JavaTestUtils.scala | 23 +++++++++++++++ 4 files changed, 68 insertions(+) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index d1932b6b05..1a2aeaa879 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -94,6 +94,12 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM */ def union(that: JavaDStream[T]): JavaDStream[T] = dstream.union(that.dstream) + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaDStream[T] = dstream.repartition(numPartitions) } object JavaDStream { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 978fca33ad..faf8f36182 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -59,6 +59,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaPairDStream[K, V] = dstream.repartition(numPartitions) + /** Method that generates a RDD for the given Duration */ def compute(validTime: Time): JavaPairRDD[K, V] = { dstream.compute(validTime) match { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index dc01f1e8aa..5a9836a415 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -183,6 +183,39 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result); } + @Test + public void testRepartitionMorePartitions() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4,5,6,7,8,9,10), + Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStream repartitioned = stream.repartition(4); + JavaTestUtils.attachTestOutputStream(repartitioned); + List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for ( List> rdd : result) { + Assert.assertEquals(4, rdd.size()); + Assert.assertEquals( + 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()); + } + } + + @Test + public void testRepartitionFewerPartitions() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4,5,6,7,8,9,10), + Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStream repartitioned = stream.repartition(2); + JavaTestUtils.attachTestOutputStream(repartitioned); + List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for ( List> rdd : result) { + Assert.assertEquals(2, rdd.size()); + Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size()); + } + } + @Test public void testGlom() { List> inputData = Arrays.asList( diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 5344ae7682..780f7b823b 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -62,6 +62,8 @@ trait JavaTestBase extends TestSuiteBase { * Process all registered streams for a numBatches batches, failing if * numExpectedOutput RDD's are not generated. Generated RDD's are collected * and returned, represented as a list for each batch interval. + * + * Returns a list of items for each RDD. */ def runStreams[V]( ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { @@ -72,6 +74,27 @@ trait JavaTestBase extends TestSuiteBase { res.map(entry => out.append(new ArrayList[V](entry))) out } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ + def runStreamsWithPartitions[V]( + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { + implicit val cm: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[JList[V]]]() + res.map(entry => { + val lists = entry.map(new ArrayList[V](_)) + out.append(new ArrayList[JList[V]](lists)) + }) + out + } } object JavaTestUtils extends JavaTestBase { -- cgit v1.2.3