aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-02-27 11:12:21 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-02-27 11:12:21 -0800
commit12bbca20657c17d5ebfceaacb37dddc851772675 (patch)
tree53d717c10b2b7ede275608b1829e6f44389daa2d /streaming/src/test
parentaace2c097ed2ca8bca33a3a3f07fb8bf772b3c50 (diff)
downloadspark-12bbca20657c17d5ebfceaacb37dddc851772675.tar.gz
spark-12bbca20657c17d5ebfceaacb37dddc851772675.tar.bz2
spark-12bbca20657c17d5ebfceaacb37dddc851772675.zip
SPARK 1084.1 (resubmitted)
(Ported from https://github.com/apache/incubator-spark/pull/637 ) Author: Sean Owen <sowen@cloudera.com> Closes #31 from srowen/SPARK-1084.1 and squashes the following commits: 6c4a32c [Sean Owen] Suppress warnings about legitimate unchecked array creations, or change code to avoid it f35b833 [Sean Owen] Fix two misc javadoc problems 254e8ef [Sean Owen] Fix one new style error introduced in scaladoc warning commit 5b2fce2 [Sean Owen] Fix scaladoc invocation warning, and enable javac warnings properly, with plugin config updates 007762b [Sean Owen] Remove dead scaladoc links b8ff8cb [Sean Owen] Replace deprecated Ant <tasks> with <target>
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java124
1 files changed, 87 insertions, 37 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4fbbce9b8b..54a0791d04 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -19,7 +19,6 @@ package org.apache.spark.streaming;
import scala.Tuple2;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
@@ -30,7 +29,6 @@ import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
-import org.apache.spark.SparkConf;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -38,6 +36,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -45,6 +44,8 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
+
+ @SuppressWarnings("unchecked")
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -64,6 +65,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testMap() {
List<List<String>> inputData = Arrays.asList(
@@ -87,6 +89,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testWindow() {
List<List<Integer>> inputData = Arrays.asList(
@@ -108,6 +111,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testWindowWithSlideDuration() {
List<List<Integer>> inputData = Arrays.asList(
@@ -132,6 +136,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
@@ -155,13 +160,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testRepartitionMorePartitions() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
- JavaDStream repartitioned = stream.repartition(4);
+ JavaDStream<Integer> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
+ JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
+ stream.repartition(4);
JavaTestUtils.attachTestOutputStream(repartitioned);
List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
Assert.assertEquals(2, result.size());
@@ -172,13 +180,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testRepartitionFewerPartitions() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
- JavaDStream repartitioned = stream.repartition(2);
+ JavaDStream<Integer> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
+ JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
+ stream.repartition(2);
JavaTestUtils.attachTestOutputStream(repartitioned);
List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
Assert.assertEquals(2, result.size());
@@ -188,6 +199,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testGlom() {
List<List<String>> inputData = Arrays.asList(
@@ -206,6 +218,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testMapPartitions() {
List<List<String>> inputData = Arrays.asList(
@@ -217,16 +230,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("YANKEESRED SOCKS"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
- @Override
- public Iterable<String> call(Iterator<String> in) {
- String out = "";
- while (in.hasNext()) {
- out = out + in.next().toUpperCase();
- }
- return Lists.newArrayList(out);
- }
- });
+ JavaDStream<String> mapped = stream.mapPartitions(
+ new FlatMapFunction<Iterator<String>, String>() {
+ @Override
+ public Iterable<String> call(Iterator<String> in) {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ }
+ });
JavaTestUtils.attachTestOutputStream(mapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -247,6 +261,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduce() {
List<List<Integer>> inputData = Arrays.asList(
@@ -267,6 +282,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByWindow() {
List<List<Integer>> inputData = Arrays.asList(
@@ -289,6 +305,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testQueueStream() {
List<List<Integer>> expected = Arrays.asList(
@@ -312,6 +329,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testTransform() {
List<List<Integer>> inputData = Arrays.asList(
@@ -344,6 +362,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testVariousTransform() {
// tests whether all variations of transform can be called from Java
@@ -423,6 +442,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
+ @SuppressWarnings("unchecked")
@Test
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -492,6 +512,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
+ @SuppressWarnings("unchecked")
@Test
public void testVariousTransformWith() {
// tests whether all variations of transformWith can be called from Java
@@ -591,6 +612,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
);
}
+ @SuppressWarnings("unchecked")
@Test
public void testStreamingContextTransform(){
List<List<Integer>> stream1input = Arrays.asList(
@@ -658,6 +680,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
@@ -683,6 +706,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(
@@ -718,22 +742,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<Integer, String>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
- @Override
- public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(in.length(), letter));
- }
- return out;
- }
- });
+ JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
+ new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
+ }
+ return out;
+ }
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testUnion() {
List<List<Integer>> inputData1 = Arrays.asList(
@@ -778,6 +804,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// PairDStream Functions
+ @SuppressWarnings("unchecked")
@Test
public void testPairFilter() {
List<List<String>> inputData = Arrays.asList(
@@ -810,7 +837,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
- List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ @SuppressWarnings("unchecked")
+ private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "yankees"),
@@ -820,7 +848,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<String, String>("new york", "rangers"),
new Tuple2<String, String>("new york", "islanders")));
- List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ @SuppressWarnings("unchecked")
+ private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
Arrays.asList(
new Tuple2<String, Integer>("california", 1),
new Tuple2<String, Integer>("california", 3),
@@ -832,6 +861,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<String, Integer>("new york", 3),
new Tuple2<String, Integer>("new york", 1)));
+ @SuppressWarnings("unchecked")
@Test
public void testPairMap() { // Maps pair -> pair of different type
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -864,6 +894,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairMapPartitions() { // Maps pair -> pair of different type
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -901,6 +932,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairMap2() { // Maps pair -> single
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -925,6 +957,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
@@ -967,6 +1000,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -989,6 +1023,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairReduceByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1013,6 +1048,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCombineByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1043,6 +1079,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCountByValue() {
List<List<String>> inputData = Arrays.asList(
@@ -1068,6 +1105,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testGroupByKeyAndWindow() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1113,6 +1151,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByKeyAndWindow() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1136,6 +1175,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testUpdateStateByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1171,6 +1211,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByKeyAndWindowWithInverse() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1194,6 +1235,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCountByValueAndWindow() {
List<List<String>> inputData = Arrays.asList(
@@ -1227,6 +1269,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, unorderedResult);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
@@ -1271,6 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairToNormalRDDTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
@@ -1312,6 +1356,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
+ @Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -1342,6 +1388,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFlatMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -1386,6 +1433,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCoGroup() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1429,6 +1477,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1472,6 +1521,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testLeftOuterJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1503,6 +1553,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -1541,7 +1592,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
- /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ @SuppressWarnings("unchecked")
@Test
public void testCheckpointofIndividualStream() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -1581,16 +1633,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testSocketString() {
class Converter extends Function<InputStream, Iterable<String>> {
- public Iterable<String> call(InputStream in) {
+ public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
List<String> out = new ArrayList<String>();
- try {
- while (true) {
- String line = reader.readLine();
- if (line == null) { break; }
- out.add(line);
- }
- } catch (IOException e) { }
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) { break; }
+ out.add(line);
+ }
return out;
}
}