aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-02-27 11:12:21 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-02-27 11:12:21 -0800
commit12bbca20657c17d5ebfceaacb37dddc851772675 (patch)
tree53d717c10b2b7ede275608b1829e6f44389daa2d /streaming
parentaace2c097ed2ca8bca33a3a3f07fb8bf772b3c50 (diff)
downloadspark-12bbca20657c17d5ebfceaacb37dddc851772675.tar.gz
spark-12bbca20657c17d5ebfceaacb37dddc851772675.tar.bz2
spark-12bbca20657c17d5ebfceaacb37dddc851772675.zip
SPARK 1084.1 (resubmitted)
(Ported from https://github.com/apache/incubator-spark/pull/637 ) Author: Sean Owen <sowen@cloudera.com> Closes #31 from srowen/SPARK-1084.1 and squashes the following commits: 6c4a32c [Sean Owen] Suppress warnings about legitimate unchecked array creations, or change code to avoid it f35b833 [Sean Owen] Fix two misc javadoc problems 254e8ef [Sean Owen] Fix one new style error introduced in scaladoc warning commit 5b2fce2 [Sean Owen] Fix scaladoc invocation warning, and enable javac warnings properly, with plugin config updates 007762b [Sean Owen] Remove dead scaladoc links b8ff8cb [Sean Owen] Replace deprecated Ant <tasks> with <target>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala16
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java124
4 files changed, 105 insertions, 55 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 4dcd0e4c51..2c7ff87744 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -127,7 +127,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]]
+ * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
@@ -151,7 +151,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
* thepartitioning of each RDD.
*/
def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
@@ -161,7 +161,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in
- * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+ * org.apache.spark.rdd.PairRDDFunctions for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -176,7 +176,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in
- * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+ * org.apache.spark.rdd.PairRDDFunctions for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -479,7 +479,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
@@ -579,7 +579,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def join[W](
other: JavaPairDStream[K, W],
@@ -619,7 +619,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def leftOuterJoin[W](
other: JavaPairDStream[K, W],
@@ -660,7 +660,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def rightOuterJoin[W](
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 2268160dcc..b082bb0585 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -406,7 +406,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
- * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
def transform[T](
dstreams: JList[JavaDStream[_]],
@@ -429,7 +429,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
- * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
def transform[K, V](
dstreams: JList[JavaDStream[_]],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f3c58aede0..2473496949 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -65,7 +65,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying `groupByKey` on each RDD. The supplied
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
@@ -95,7 +95,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
@@ -376,7 +376,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
@@ -396,7 +396,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated. Note, that
* this function may generate a different a tuple with a different key
@@ -453,7 +453,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
+ * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs.
*/
def cogroup[W: ClassTag](
other: DStream[(K, W)],
@@ -483,7 +483,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def join[W: ClassTag](
other: DStream[(K, W)],
@@ -518,7 +518,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def leftOuterJoin[W: ClassTag](
@@ -554,7 +554,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def rightOuterJoin[W: ClassTag](
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4fbbce9b8b..54a0791d04 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -19,7 +19,6 @@ package org.apache.spark.streaming;
import scala.Tuple2;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
@@ -30,7 +29,6 @@ import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
-import org.apache.spark.SparkConf;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -38,6 +36,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -45,6 +44,8 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
+
+ @SuppressWarnings("unchecked")
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -64,6 +65,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testMap() {
List<List<String>> inputData = Arrays.asList(
@@ -87,6 +89,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testWindow() {
List<List<Integer>> inputData = Arrays.asList(
@@ -108,6 +111,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testWindowWithSlideDuration() {
List<List<Integer>> inputData = Arrays.asList(
@@ -132,6 +136,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
@@ -155,13 +160,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testRepartitionMorePartitions() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
- JavaDStream repartitioned = stream.repartition(4);
+ JavaDStream<Integer> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
+ JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
+ stream.repartition(4);
JavaTestUtils.attachTestOutputStream(repartitioned);
List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
Assert.assertEquals(2, result.size());
@@ -172,13 +180,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testRepartitionFewerPartitions() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
- JavaDStream repartitioned = stream.repartition(2);
+ JavaDStream<Integer> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
+ JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
+ stream.repartition(2);
JavaTestUtils.attachTestOutputStream(repartitioned);
List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
Assert.assertEquals(2, result.size());
@@ -188,6 +199,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testGlom() {
List<List<String>> inputData = Arrays.asList(
@@ -206,6 +218,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testMapPartitions() {
List<List<String>> inputData = Arrays.asList(
@@ -217,16 +230,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("YANKEESRED SOCKS"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
- @Override
- public Iterable<String> call(Iterator<String> in) {
- String out = "";
- while (in.hasNext()) {
- out = out + in.next().toUpperCase();
- }
- return Lists.newArrayList(out);
- }
- });
+ JavaDStream<String> mapped = stream.mapPartitions(
+ new FlatMapFunction<Iterator<String>, String>() {
+ @Override
+ public Iterable<String> call(Iterator<String> in) {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ }
+ });
JavaTestUtils.attachTestOutputStream(mapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -247,6 +261,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduce() {
List<List<Integer>> inputData = Arrays.asList(
@@ -267,6 +282,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByWindow() {
List<List<Integer>> inputData = Arrays.asList(
@@ -289,6 +305,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testQueueStream() {
List<List<Integer>> expected = Arrays.asList(
@@ -312,6 +329,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testTransform() {
List<List<Integer>> inputData = Arrays.asList(
@@ -344,6 +362,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testVariousTransform() {
// tests whether all variations of transform can be called from Java
@@ -423,6 +442,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
+ @SuppressWarnings("unchecked")
@Test
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -492,6 +512,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
+ @SuppressWarnings("unchecked")
@Test
public void testVariousTransformWith() {
// tests whether all variations of transformWith can be called from Java
@@ -591,6 +612,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
);
}
+ @SuppressWarnings("unchecked")
@Test
public void testStreamingContextTransform(){
List<List<Integer>> stream1input = Arrays.asList(
@@ -658,6 +680,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
@@ -683,6 +706,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(
@@ -718,22 +742,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<Integer, String>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
- @Override
- public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(in.length(), letter));
- }
- return out;
- }
- });
+ JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
+ new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
+ }
+ return out;
+ }
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testUnion() {
List<List<Integer>> inputData1 = Arrays.asList(
@@ -778,6 +804,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// PairDStream Functions
+ @SuppressWarnings("unchecked")
@Test
public void testPairFilter() {
List<List<String>> inputData = Arrays.asList(
@@ -810,7 +837,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
- List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ @SuppressWarnings("unchecked")
+ private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "yankees"),
@@ -820,7 +848,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<String, String>("new york", "rangers"),
new Tuple2<String, String>("new york", "islanders")));
- List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ @SuppressWarnings("unchecked")
+ private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
Arrays.asList(
new Tuple2<String, Integer>("california", 1),
new Tuple2<String, Integer>("california", 3),
@@ -832,6 +861,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<String, Integer>("new york", 3),
new Tuple2<String, Integer>("new york", 1)));
+ @SuppressWarnings("unchecked")
@Test
public void testPairMap() { // Maps pair -> pair of different type
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -864,6 +894,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairMapPartitions() { // Maps pair -> pair of different type
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -901,6 +932,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairMap2() { // Maps pair -> single
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -925,6 +957,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
@@ -967,6 +1000,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -989,6 +1023,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairReduceByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1013,6 +1048,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCombineByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1043,6 +1079,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCountByValue() {
List<List<String>> inputData = Arrays.asList(
@@ -1068,6 +1105,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testGroupByKeyAndWindow() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1113,6 +1151,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByKeyAndWindow() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1136,6 +1175,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testUpdateStateByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1171,6 +1211,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByKeyAndWindowWithInverse() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1194,6 +1235,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCountByValueAndWindow() {
List<List<String>> inputData = Arrays.asList(
@@ -1227,6 +1269,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, unorderedResult);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
@@ -1271,6 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairToNormalRDDTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
@@ -1312,6 +1356,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
+ @Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -1342,6 +1388,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFlatMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -1386,6 +1433,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCoGroup() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1429,6 +1477,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1472,6 +1521,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testLeftOuterJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1503,6 +1553,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -1541,7 +1592,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
- /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ @SuppressWarnings("unchecked")
@Test
public void testCheckpointofIndividualStream() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -1581,16 +1633,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testSocketString() {
class Converter extends Function<InputStream, Iterable<String>> {
- public Iterable<String> call(InputStream in) {
+ public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
List<String> out = new ArrayList<String>();
- try {
- while (true) {
- String line = reader.readLine();
- if (line == null) { break; }
- out.add(line);
- }
- } catch (IOException e) { }
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) { break; }
+ out.add(line);
+ }
return out;
}
}