diff options
Diffstat (limited to 'streaming')
4 files changed, 105 insertions, 55 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 4dcd0e4c51..2c7ff87744 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -127,7 +127,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream. * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]] + * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner * is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = @@ -151,7 +151,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * thepartitioning of each RDD. */ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { @@ -161,7 +161,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -176,7 +176,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -479,7 +479,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new @@ -579,7 +579,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W]( other: JavaPairDStream[K, W], @@ -619,7 +619,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def leftOuterJoin[W]( other: JavaPairDStream[K, W], @@ -660,7 +660,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 2268160dcc..b082bb0585 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -406,7 +406,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). * In the transform function, convert the JavaRDD corresponding to that JavaDStream to - * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ def transform[T]( dstreams: JList[JavaDStream[_]], @@ -429,7 +429,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). * In the transform function, convert the JavaRDD corresponding to that JavaDStream to - * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ def transform[K, V]( dstreams: JList[JavaDStream[_]], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index f3c58aede0..2473496949 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -65,7 +65,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying `groupByKey` on each RDD. The supplied - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) @@ -95,7 +95,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { @@ -376,7 +376,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new @@ -396,7 +396,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. Note, that * this function may generate a different a tuple with a different key @@ -453,7 +453,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. + * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs. */ def cogroup[W: ClassTag]( other: DStream[(K, W)], @@ -483,7 +483,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W: ClassTag]( other: DStream[(K, W)], @@ -518,7 +518,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def leftOuterJoin[W: ClassTag]( @@ -554,7 +554,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W: ClassTag]( diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4fbbce9b8b..54a0791d04 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -19,7 +19,6 @@ package org.apache.spark.streaming; import scala.Tuple2; -import org.junit.After; import org.junit.Assert; import org.junit.Test; import java.io.*; @@ -30,7 +29,6 @@ import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.collect.Sets; -import org.apache.spark.SparkConf; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -38,6 +36,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -45,6 +44,8 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { + + @SuppressWarnings("unchecked") @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -64,6 +65,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testMap() { List<List<String>> inputData = Arrays.asList( @@ -87,6 +89,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testWindow() { List<List<Integer>> inputData = Arrays.asList( @@ -108,6 +111,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testWindowWithSlideDuration() { List<List<Integer>> inputData = Arrays.asList( @@ -132,6 +136,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFilter() { List<List<String>> inputData = Arrays.asList( @@ -155,13 +160,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testRepartitionMorePartitions() { List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); - JavaDStream repartitioned = stream.repartition(4); + JavaDStream<Integer> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned = + stream.repartition(4); JavaTestUtils.attachTestOutputStream(repartitioned); List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); @@ -172,13 +180,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } } + @SuppressWarnings("unchecked") @Test public void testRepartitionFewerPartitions() { List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); - JavaDStream repartitioned = stream.repartition(2); + JavaDStream<Integer> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned = + stream.repartition(2); JavaTestUtils.attachTestOutputStream(repartitioned); List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); @@ -188,6 +199,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } } + @SuppressWarnings("unchecked") @Test public void testGlom() { List<List<String>> inputData = Arrays.asList( @@ -206,6 +218,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testMapPartitions() { List<List<String>> inputData = Arrays.asList( @@ -217,16 +230,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("YANKEESRED SOCKS")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { - @Override - public Iterable<String> call(Iterator<String> in) { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out); - } - }); + JavaDStream<String> mapped = stream.mapPartitions( + new FlatMapFunction<Iterator<String>, String>() { + @Override + public Iterable<String> call(Iterator<String> in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); JavaTestUtils.attachTestOutputStream(mapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -247,6 +261,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } } + @SuppressWarnings("unchecked") @Test public void testReduce() { List<List<Integer>> inputData = Arrays.asList( @@ -267,6 +282,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testReduceByWindow() { List<List<Integer>> inputData = Arrays.asList( @@ -289,6 +305,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testQueueStream() { List<List<Integer>> expected = Arrays.asList( @@ -312,6 +329,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testTransform() { List<List<Integer>> inputData = Arrays.asList( @@ -344,6 +362,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testVariousTransform() { // tests whether all variations of transform can be called from Java @@ -423,6 +442,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } + @SuppressWarnings("unchecked") @Test public void testTransformWith() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -492,6 +512,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } + @SuppressWarnings("unchecked") @Test public void testVariousTransformWith() { // tests whether all variations of transformWith can be called from Java @@ -591,6 +612,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ); } + @SuppressWarnings("unchecked") @Test public void testStreamingContextTransform(){ List<List<Integer>> stream1input = Arrays.asList( @@ -658,6 +680,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFlatMap() { List<List<String>> inputData = Arrays.asList( @@ -683,6 +706,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairFlatMap() { List<List<String>> inputData = Arrays.asList( @@ -718,22 +742,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<Integer, String>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { - @Override - public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(in.length(), letter)); - } - return out; - } - }); + JavaPairDStream<Integer,String> flatMapped = stream.flatMap( + new PairFlatMapFunction<String, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2<Integer, String>(in.length(), letter)); + } + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testUnion() { List<List<Integer>> inputData1 = Arrays.asList( @@ -778,6 +804,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // PairDStream Functions + @SuppressWarnings("unchecked") @Test public void testPairFilter() { List<List<String>> inputData = Arrays.asList( @@ -810,7 +837,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } - List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( + @SuppressWarnings("unchecked") + private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( Arrays.asList(new Tuple2<String, String>("california", "dodgers"), new Tuple2<String, String>("california", "giants"), new Tuple2<String, String>("new york", "yankees"), @@ -820,7 +848,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<String, String>("new york", "rangers"), new Tuple2<String, String>("new york", "islanders"))); - List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( + @SuppressWarnings("unchecked") + private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( Arrays.asList( new Tuple2<String, Integer>("california", 1), new Tuple2<String, Integer>("california", 3), @@ -832,6 +861,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<String, Integer>("new york", 3), new Tuple2<String, Integer>("new york", 1))); + @SuppressWarnings("unchecked") @Test public void testPairMap() { // Maps pair -> pair of different type List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -864,6 +894,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairMapPartitions() { // Maps pair -> pair of different type List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -901,6 +932,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairMap2() { // Maps pair -> single List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -925,6 +957,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( @@ -967,6 +1000,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairGroupByKey() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; @@ -989,6 +1023,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairReduceByKey() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1013,6 +1048,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCombineByKey() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1043,6 +1079,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCountByValue() { List<List<String>> inputData = Arrays.asList( @@ -1068,6 +1105,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testGroupByKeyAndWindow() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1113,6 +1151,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2())); } + @SuppressWarnings("unchecked") @Test public void testReduceByKeyAndWindow() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1136,6 +1175,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testUpdateStateByKey() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1171,6 +1211,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testReduceByKeyAndWindowWithInverse() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1194,6 +1235,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCountByValueAndWindow() { List<List<String>> inputData = Arrays.asList( @@ -1227,6 +1269,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, unorderedResult); } + @SuppressWarnings("unchecked") @Test public void testPairTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( @@ -1271,6 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairToNormalRDDTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( @@ -1312,6 +1356,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") + @Test public void testMapValues() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; @@ -1342,6 +1388,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFlatMapValues() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; @@ -1386,6 +1433,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCoGroup() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -1429,6 +1477,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testJoin() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -1472,6 +1521,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testLeftOuterJoin() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -1503,6 +1553,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCheckpointMasterRecovery() throws InterruptedException { List<List<String>> inputData = Arrays.asList( @@ -1541,7 +1592,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } - /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @SuppressWarnings("unchecked") @Test public void testCheckpointofIndividualStream() throws InterruptedException { List<List<String>> inputData = Arrays.asList( @@ -1581,16 +1633,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testSocketString() { class Converter extends Function<InputStream, Iterable<String>> { - public Iterable<String> call(InputStream in) { + public Iterable<String> call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); List<String> out = new ArrayList<String>(); - try { - while (true) { - String line = reader.readLine(); - if (line == null) { break; } - out.add(line); - } - } catch (IOException e) { } + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } return out; } } |