aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java451
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java24
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java17
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java14
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java4
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java46
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java39
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java29
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java15
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java9
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java10
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java8
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java12
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java752
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java86
15 files changed, 755 insertions, 761 deletions
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index ebd3d61ae7..fd8f7f39b7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -90,7 +90,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<String> sUnion = sc.union(s1, s2);
Assert.assertEquals(4, sUnion.count());
// List
- List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>();
+ List<JavaRDD<String>> list = new ArrayList<>();
list.add(s2);
sUnion = sc.union(s1, list);
Assert.assertEquals(4, sUnion.count());
@@ -103,9 +103,9 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(4, dUnion.count());
// Union of JavaPairRDDs
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
- pairs.add(new Tuple2<Integer, Integer>(1, 2));
- pairs.add(new Tuple2<Integer, Integer>(3, 4));
+ List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+ pairs.add(new Tuple2<>(1, 2));
+ pairs.add(new Tuple2<>(3, 4));
JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2);
@@ -133,9 +133,9 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD dIntersection = d1.intersection(d2);
Assert.assertEquals(2, dIntersection.count());
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
- pairs.add(new Tuple2<Integer, Integer>(1, 2));
- pairs.add(new Tuple2<Integer, Integer>(3, 4));
+ List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+ pairs.add(new Tuple2<>(1, 2));
+ pairs.add(new Tuple2<>(3, 4));
JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> pIntersection = p1.intersection(p2);
@@ -165,47 +165,49 @@ public class JavaAPISuite implements Serializable {
@Test
public void sortByKey() {
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
- pairs.add(new Tuple2<Integer, Integer>(0, 4));
- pairs.add(new Tuple2<Integer, Integer>(3, 2));
- pairs.add(new Tuple2<Integer, Integer>(-1, 1));
+ List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+ pairs.add(new Tuple2<>(0, 4));
+ pairs.add(new Tuple2<>(3, 2));
+ pairs.add(new Tuple2<>(-1, 1));
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
// Default comparator
JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey();
- Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
+ Assert.assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
- Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
- Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
+ Assert.assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
+ Assert.assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
// Custom comparator
sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false);
- Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
+ Assert.assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
sortedPairs = sortedRDD.collect();
- Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
- Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
+ Assert.assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
+ Assert.assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
}
@SuppressWarnings("unchecked")
@Test
public void repartitionAndSortWithinPartitions() {
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
- pairs.add(new Tuple2<Integer, Integer>(0, 5));
- pairs.add(new Tuple2<Integer, Integer>(3, 8));
- pairs.add(new Tuple2<Integer, Integer>(2, 6));
- pairs.add(new Tuple2<Integer, Integer>(0, 8));
- pairs.add(new Tuple2<Integer, Integer>(3, 8));
- pairs.add(new Tuple2<Integer, Integer>(1, 3));
+ List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+ pairs.add(new Tuple2<>(0, 5));
+ pairs.add(new Tuple2<>(3, 8));
+ pairs.add(new Tuple2<>(2, 6));
+ pairs.add(new Tuple2<>(0, 8));
+ pairs.add(new Tuple2<>(3, 8));
+ pairs.add(new Tuple2<>(1, 3));
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
Partitioner partitioner = new Partitioner() {
+ @Override
public int numPartitions() {
return 2;
}
+ @Override
public int getPartition(Object key) {
- return ((Integer)key).intValue() % 2;
+ return (Integer) key % 2;
}
};
@@ -214,10 +216,10 @@ public class JavaAPISuite implements Serializable {
Assert.assertTrue(repartitioned.partitioner().isPresent());
Assert.assertEquals(repartitioned.partitioner().get(), partitioner);
List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect();
- Assert.assertEquals(partitions.get(0), Arrays.asList(new Tuple2<Integer, Integer>(0, 5),
- new Tuple2<Integer, Integer>(0, 8), new Tuple2<Integer, Integer>(2, 6)));
- Assert.assertEquals(partitions.get(1), Arrays.asList(new Tuple2<Integer, Integer>(1, 3),
- new Tuple2<Integer, Integer>(3, 8), new Tuple2<Integer, Integer>(3, 8)));
+ Assert.assertEquals(partitions.get(0),
+ Arrays.asList(new Tuple2<>(0, 5), new Tuple2<>(0, 8), new Tuple2<>(2, 6)));
+ Assert.assertEquals(partitions.get(1),
+ Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8)));
}
@Test
@@ -228,35 +230,37 @@ public class JavaAPISuite implements Serializable {
@Test
public void sortBy() {
- List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
- pairs.add(new Tuple2<Integer, Integer>(0, 4));
- pairs.add(new Tuple2<Integer, Integer>(3, 2));
- pairs.add(new Tuple2<Integer, Integer>(-1, 1));
+ List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
+ pairs.add(new Tuple2<>(0, 4));
+ pairs.add(new Tuple2<>(3, 2));
+ pairs.add(new Tuple2<>(-1, 1));
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
// compare on first value
JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
- public Integer call(Tuple2<Integer, Integer> t) throws Exception {
+ @Override
+ public Integer call(Tuple2<Integer, Integer> t) {
return t._1();
}
}, true, 2);
- Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
+ Assert.assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
- Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
- Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
+ Assert.assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
+ Assert.assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
// compare on second value
sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
- public Integer call(Tuple2<Integer, Integer> t) throws Exception {
+ @Override
+ public Integer call(Tuple2<Integer, Integer> t) {
return t._2();
}
}, true, 2);
- Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
+ Assert.assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
sortedPairs = sortedRDD.collect();
- Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(1));
- Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(2));
+ Assert.assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
+ Assert.assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2));
}
@Test
@@ -265,7 +269,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreach(new VoidFunction<String>() {
@Override
- public void call(String s) throws IOException {
+ public void call(String s) {
accum.add(1);
}
});
@@ -278,7 +282,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
- public void call(Iterator<String> iter) throws IOException {
+ public void call(Iterator<String> iter) {
while (iter.hasNext()) {
iter.next();
accum.add(1);
@@ -301,7 +305,7 @@ public class JavaAPISuite implements Serializable {
List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
JavaRDD<Long> indexes = zip.values();
- Assert.assertEquals(4, new HashSet<Long>(indexes.collect()).size());
+ Assert.assertEquals(4, new HashSet<>(indexes.collect()).size());
}
@Test
@@ -317,10 +321,10 @@ public class JavaAPISuite implements Serializable {
@Test
public void lookup() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, String>("Apples", "Fruit"),
- new Tuple2<String, String>("Oranges", "Fruit"),
- new Tuple2<String, String>("Oranges", "Citrus")
- ));
+ new Tuple2<>("Apples", "Fruit"),
+ new Tuple2<>("Oranges", "Fruit"),
+ new Tuple2<>("Oranges", "Citrus")
+ ));
Assert.assertEquals(2, categories.lookup("Oranges").size());
Assert.assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0)));
}
@@ -390,18 +394,17 @@ public class JavaAPISuite implements Serializable {
@Test
public void cogroup() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, String>("Apples", "Fruit"),
- new Tuple2<String, String>("Oranges", "Fruit"),
- new Tuple2<String, String>("Oranges", "Citrus")
+ new Tuple2<>("Apples", "Fruit"),
+ new Tuple2<>("Oranges", "Fruit"),
+ new Tuple2<>("Oranges", "Citrus")
));
JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, Integer>("Oranges", 2),
- new Tuple2<String, Integer>("Apples", 3)
+ new Tuple2<>("Oranges", 2),
+ new Tuple2<>("Apples", 3)
));
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped =
categories.cogroup(prices);
- Assert.assertEquals("[Fruit, Citrus]",
- Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
cogrouped.collect();
@@ -411,23 +414,22 @@ public class JavaAPISuite implements Serializable {
@Test
public void cogroup3() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, String>("Apples", "Fruit"),
- new Tuple2<String, String>("Oranges", "Fruit"),
- new Tuple2<String, String>("Oranges", "Citrus")
+ new Tuple2<>("Apples", "Fruit"),
+ new Tuple2<>("Oranges", "Fruit"),
+ new Tuple2<>("Oranges", "Citrus")
));
JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, Integer>("Oranges", 2),
- new Tuple2<String, Integer>("Apples", 3)
+ new Tuple2<>("Oranges", 2),
+ new Tuple2<>("Apples", 3)
));
JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, Integer>("Oranges", 21),
- new Tuple2<String, Integer>("Apples", 42)
+ new Tuple2<>("Oranges", 21),
+ new Tuple2<>("Apples", 42)
));
JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped =
categories.cogroup(prices, quantities);
- Assert.assertEquals("[Fruit, Citrus]",
- Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
@@ -439,27 +441,26 @@ public class JavaAPISuite implements Serializable {
@Test
public void cogroup4() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, String>("Apples", "Fruit"),
- new Tuple2<String, String>("Oranges", "Fruit"),
- new Tuple2<String, String>("Oranges", "Citrus")
+ new Tuple2<>("Apples", "Fruit"),
+ new Tuple2<>("Oranges", "Fruit"),
+ new Tuple2<>("Oranges", "Citrus")
));
JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, Integer>("Oranges", 2),
- new Tuple2<String, Integer>("Apples", 3)
+ new Tuple2<>("Oranges", 2),
+ new Tuple2<>("Apples", 3)
));
JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, Integer>("Oranges", 21),
- new Tuple2<String, Integer>("Apples", 42)
+ new Tuple2<>("Oranges", 21),
+ new Tuple2<>("Apples", 42)
));
JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList(
- new Tuple2<String, String>("Oranges", "BR"),
- new Tuple2<String, String>("Apples", "US")
+ new Tuple2<>("Oranges", "BR"),
+ new Tuple2<>("Apples", "US")
));
JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>, Iterable<String>>> cogrouped =
categories.cogroup(prices, quantities, countries);
- Assert.assertEquals("[Fruit, Citrus]",
- Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
Assert.assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4()));
@@ -471,16 +472,16 @@ public class JavaAPISuite implements Serializable {
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(1, 2),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(3, 1)
+ new Tuple2<>(1, 1),
+ new Tuple2<>(1, 2),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<Integer, Character>(1, 'x'),
- new Tuple2<Integer, Character>(2, 'y'),
- new Tuple2<Integer, Character>(2, 'z'),
- new Tuple2<Integer, Character>(4, 'w')
+ new Tuple2<>(1, 'x'),
+ new Tuple2<>(2, 'y'),
+ new Tuple2<>(2, 'z'),
+ new Tuple2<>(4, 'w')
));
List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
@@ -548,11 +549,11 @@ public class JavaAPISuite implements Serializable {
public void aggregateByKey() {
JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs(
Arrays.asList(
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(3, 2),
- new Tuple2<Integer, Integer>(5, 1),
- new Tuple2<Integer, Integer>(5, 3)), 2);
+ new Tuple2<>(1, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(5, 1),
+ new Tuple2<>(5, 3)), 2);
Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
new Function2<Set<Integer>, Integer, Set<Integer>>() {
@@ -570,20 +571,20 @@ public class JavaAPISuite implements Serializable {
}
}).collectAsMap();
Assert.assertEquals(3, sets.size());
- Assert.assertEquals(new HashSet<Integer>(Arrays.asList(1)), sets.get(1));
- Assert.assertEquals(new HashSet<Integer>(Arrays.asList(2)), sets.get(3));
- Assert.assertEquals(new HashSet<Integer>(Arrays.asList(1, 3)), sets.get(5));
+ Assert.assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
+ Assert.assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
+ Assert.assertEquals(new HashSet<>(Arrays.asList(1, 3)), sets.get(5));
}
@SuppressWarnings("unchecked")
@Test
public void foldByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(3, 2),
- new Tuple2<Integer, Integer>(3, 1)
+ new Tuple2<>(2, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
@@ -602,11 +603,11 @@ public class JavaAPISuite implements Serializable {
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(2, 1),
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(3, 2),
- new Tuple2<Integer, Integer>(3, 1)
+ new Tuple2<>(2, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
@@ -690,7 +691,7 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD);
- Assert.assertEquals(new Tuple2<String, Double>("Hello", 1.0), cartesian.first());
+ Assert.assertEquals(new Tuple2<>("Hello", 1.0), cartesian.first());
}
@Test
@@ -743,6 +744,7 @@ public class JavaAPISuite implements Serializable {
}
private static class DoubleComparator implements Comparator<Double>, Serializable {
+ @Override
public int compare(Double o1, Double o2) {
return o1.compareTo(o2);
}
@@ -766,14 +768,14 @@ public class JavaAPISuite implements Serializable {
public void naturalMax() {
JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
double max = rdd.max();
- Assert.assertTrue(4.0 == max);
+ Assert.assertEquals(4.0, max, 0.0);
}
@Test
public void naturalMin() {
JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
double max = rdd.min();
- Assert.assertTrue(1.0 == max);
+ Assert.assertEquals(1.0, max, 0.0);
}
@Test
@@ -809,7 +811,7 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
double sum = rdd.reduce(new Function2<Double, Double, Double>() {
@Override
- public Double call(Double v1, Double v2) throws Exception {
+ public Double call(Double v1, Double v2) {
return v1 + v2;
}
});
@@ -844,7 +846,7 @@ public class JavaAPISuite implements Serializable {
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer x) {
- return new Tuple2<Integer, Integer>(x, x);
+ return new Tuple2<>(x, x);
}
}).cache();
pairs.collect();
@@ -870,26 +872,25 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals("Hello", words.first());
Assert.assertEquals(11, words.count());
- JavaPairRDD<String, String> pairs = rdd.flatMapToPair(
+ JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
new PairFlatMapFunction<String, String, String>() {
-
@Override
public Iterable<Tuple2<String, String>> call(String s) {
- List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
+ List<Tuple2<String, String>> pairs = new LinkedList<>();
for (String word : s.split(" ")) {
- pairs.add(new Tuple2<String, String>(word, word));
+ pairs.add(new Tuple2<>(word, word));
}
return pairs;
}
}
);
- Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
- Assert.assertEquals(11, pairs.count());
+ Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first());
+ Assert.assertEquals(11, pairsRDD.count());
JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
@Override
public Iterable<Double> call(String s) {
- List<Double> lengths = new LinkedList<Double>();
+ List<Double> lengths = new LinkedList<>();
for (String word : s.split(" ")) {
lengths.add((double) word.length());
}
@@ -897,36 +898,36 @@ public class JavaAPISuite implements Serializable {
}
});
Assert.assertEquals(5.0, doubles.first(), 0.01);
- Assert.assertEquals(11, pairs.count());
+ Assert.assertEquals(11, pairsRDD.count());
}
@SuppressWarnings("unchecked")
@Test
public void mapsFromPairsToPairs() {
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
- );
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
-
- // Regression test for SPARK-668:
- JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
- new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
- return Collections.singletonList(item.swap());
- }
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+ // Regression test for SPARK-668:
+ JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
+ new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
+ @Override
+ public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
+ return Collections.singletonList(item.swap());
+ }
});
- swapped.collect();
+ swapped.collect();
- // There was never a bug here, but it's worth testing:
- pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<Integer, String> item) {
- return item.swap();
- }
- }).collect();
+ // There was never a bug here, but it's worth testing:
+ pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(Tuple2<Integer, String> item) {
+ return item.swap();
+ }
+ }).collect();
}
@Test
@@ -953,7 +954,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex(
new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
@Override
- public Iterator<Integer> call(Integer index, Iterator<Integer> iter) throws Exception {
+ public Iterator<Integer> call(Integer index, Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
@@ -972,8 +973,8 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> repartitioned1 = in1.repartition(4);
List<List<Integer>> result1 = repartitioned1.glom().collect();
Assert.assertEquals(4, result1.size());
- for (List<Integer> l: result1) {
- Assert.assertTrue(l.size() > 0);
+ for (List<Integer> l : result1) {
+ Assert.assertFalse(l.isEmpty());
}
// Growing number of partitions
@@ -982,7 +983,7 @@ public class JavaAPISuite implements Serializable {
List<List<Integer>> result2 = repartitioned2.glom().collect();
Assert.assertEquals(2, result2.size());
for (List<Integer> l: result2) {
- Assert.assertTrue(l.size() > 0);
+ Assert.assertFalse(l.isEmpty());
}
}
@@ -994,9 +995,9 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(20, doubleRDD.sum(), 0.1);
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY());
@@ -1046,7 +1047,7 @@ public class JavaAPISuite implements Serializable {
Files.write(content1, new File(tempDirName + "/part-00000"));
Files.write(content2, new File(tempDirName + "/part-00001"));
- Map<String, String> container = new HashMap<String, String>();
+ Map<String, String> container = new HashMap<>();
container.put(tempDirName+"/part-00000", new Text(content1).toString());
container.put(tempDirName+"/part-00001", new Text(content2).toString());
@@ -1075,16 +1076,16 @@ public class JavaAPISuite implements Serializable {
public void sequenceFile() {
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
@@ -1093,7 +1094,7 @@ public class JavaAPISuite implements Serializable {
Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
- return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
+ return new Tuple2<>(pair._1().get(), pair._2().toString());
}
});
Assert.assertEquals(pairs, readRDD.collect());
@@ -1110,7 +1111,7 @@ public class JavaAPISuite implements Serializable {
FileOutputStream fos1 = new FileOutputStream(file1);
FileChannel channel1 = fos1.getChannel();
- ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+ ByteBuffer bbuf = ByteBuffer.wrap(content1);
channel1.write(bbuf);
channel1.close();
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName, 3);
@@ -1131,14 +1132,14 @@ public class JavaAPISuite implements Serializable {
FileOutputStream fos1 = new FileOutputStream(file1);
FileChannel channel1 = fos1.getChannel();
- ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+ ByteBuffer bbuf = ByteBuffer.wrap(content1);
channel1.write(bbuf);
channel1.close();
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
@Override
- public void call(Tuple2<String, PortableDataStream> pair) throws Exception {
+ public void call(Tuple2<String, PortableDataStream> pair) {
pair._2().toArray(); // force the file to read
}
});
@@ -1162,7 +1163,7 @@ public class JavaAPISuite implements Serializable {
FileChannel channel1 = fos1.getChannel();
for (int i = 0; i < numOfCopies; i++) {
- ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+ ByteBuffer bbuf = ByteBuffer.wrap(content1);
channel1.write(bbuf);
}
channel1.close();
@@ -1180,24 +1181,23 @@ public class JavaAPISuite implements Serializable {
public void writeWithNewAPIHadoopFile() {
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
}
- }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
- org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+ }).saveAsNewAPIHadoopFile(
+ outputDir, IntWritable.class, Text.class,
+ org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
- JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class,
- Text.class);
- Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
- String>() {
+ JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class, Text.class);
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
@Override
public String call(Tuple2<IntWritable, Text> x) {
return x.toString();
@@ -1210,24 +1210,23 @@ public class JavaAPISuite implements Serializable {
public void readWithNewAPIHadoopFile() throws IOException {
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
- org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class,
- Text.class, new Job().getConfiguration());
- Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
- String>() {
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
+ IntWritable.class, Text.class, new Job().getConfiguration());
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
@Override
public String call(Tuple2<IntWritable, Text> x) {
return x.toString();
@@ -1251,9 +1250,9 @@ public class JavaAPISuite implements Serializable {
public void objectFilesOfComplexTypes() {
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.saveAsObjectFile(outputDir);
@@ -1267,23 +1266,22 @@ public class JavaAPISuite implements Serializable {
public void hadoopFile() {
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
- String>() {
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
@Override
public String call(Tuple2<IntWritable, Text> x) {
return x.toString();
@@ -1296,16 +1294,16 @@ public class JavaAPISuite implements Serializable {
public void hadoopFileCompressed() {
String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "aa"),
- new Tuple2<Integer, String>(3, "aaa")
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+ return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
DefaultCodec.class);
@@ -1313,8 +1311,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
SequenceFileInputFormat.class, IntWritable.class, Text.class);
- Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
- String>() {
+ Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
@Override
public String call(Tuple2<IntWritable, Text> x) {
return x.toString();
@@ -1414,8 +1411,8 @@ public class JavaAPISuite implements Serializable {
return t.toString();
}
}).collect();
- Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
- Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+ Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
}
@Test
@@ -1448,20 +1445,20 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
Function<Integer, Integer> keyFunction = new Function<Integer, Integer>() {
@Override
- public Integer call(Integer v1) throws Exception {
+ public Integer call(Integer v1) {
return v1 % 3;
}
};
Function<Integer, Integer> createCombinerFunction = new Function<Integer, Integer>() {
@Override
- public Integer call(Integer v1) throws Exception {
+ public Integer call(Integer v1) {
return v1;
}
};
Function2<Integer, Integer, Integer> mergeValueFunction = new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer v1, Integer v2) throws Exception {
+ public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
};
@@ -1496,21 +1493,21 @@ public class JavaAPISuite implements Serializable {
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<Integer, Integer>(i, i % 2);
+ return new Tuple2<>(i, i % 2);
}
});
JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) {
- return new Tuple2<Integer, Integer>(in._2(), in._1());
- }
- });
+ @Override
+ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) {
+ return new Tuple2<>(in._2(), in._1());
+ }
+ });
Assert.assertEquals(Arrays.asList(
- new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(0, 2),
- new Tuple2<Integer, Integer>(1, 3),
- new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+ new Tuple2<>(1, 1),
+ new Tuple2<>(0, 2),
+ new Tuple2<>(1, 3),
+ new Tuple2<>(0, 4)), rdd3.collect());
}
@@ -1523,7 +1520,7 @@ public class JavaAPISuite implements Serializable {
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<Integer, Integer>(i, i % 2);
+ return new Tuple2<>(i, i % 2);
}
});
@@ -1534,23 +1531,23 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(2, 0)),
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1),
+ new Tuple2<>(2, 0)),
rdd2.collectPartitions(new int[] {0})[0]);
List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
- new Tuple2<Integer, Integer>(4, 0)),
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1),
+ new Tuple2<>(4, 0)),
parts2[0]);
- Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
- new Tuple2<Integer, Integer>(6, 0),
- new Tuple2<Integer, Integer>(7, 1)),
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1),
+ new Tuple2<>(6, 0),
+ new Tuple2<>(7, 1)),
parts2[1]);
}
@Test
public void countApproxDistinct() {
- List<Integer> arrayData = new ArrayList<Integer>();
+ List<Integer> arrayData = new ArrayList<>();
int size = 100;
for (int i = 0; i < 100000; i++) {
arrayData.add(i % size);
@@ -1561,15 +1558,15 @@ public class JavaAPISuite implements Serializable {
@Test
public void countApproxDistinctByKey() {
- List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
+ List<Tuple2<Integer, Integer>> arrayData = new ArrayList<>();
for (int i = 10; i < 100; i++) {
for (int j = 0; j < i; j++) {
- arrayData.add(new Tuple2<Integer, Integer>(i, j));
+ arrayData.add(new Tuple2<>(i, j));
}
}
double relativeSD = 0.001;
JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
- List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(8, 0).collect();
+ List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
for (Tuple2<Integer, Object> resItem : res) {
double count = (double)resItem._1();
Long resCount = (Long)resItem._2();
@@ -1587,7 +1584,7 @@ public class JavaAPISuite implements Serializable {
new PairFunction<Integer, Integer, int[]>() {
@Override
public Tuple2<Integer, int[]> call(Integer x) {
- return new Tuple2<Integer, int[]>(x, new int[] { x });
+ return new Tuple2<>(x, new int[]{x});
}
});
pairRDD.collect(); // Works fine
@@ -1598,7 +1595,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void collectAsMapAndSerialize() throws Exception {
JavaPairRDD<String,Integer> rdd =
- sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("foo", 1)));
+ sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1)));
Map<String,Integer> map = rdd.collectAsMap();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
new ObjectOutputStream(bytes).writeObject(map);
@@ -1615,7 +1612,7 @@ public class JavaAPISuite implements Serializable {
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<Integer, Integer>(i % 2, 1);
+ return new Tuple2<>(i % 2, 1);
}
});
Map<Integer, Object> fractions = Maps.newHashMap();
@@ -1623,12 +1620,12 @@ public class JavaAPISuite implements Serializable {
fractions.put(1, 1.0);
JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L);
Map<Integer, Long> wrCounts = (Map<Integer, Long>) (Object) wr.countByKey();
- Assert.assertTrue(wrCounts.size() == 2);
+ Assert.assertEquals(2, wrCounts.size());
Assert.assertTrue(wrCounts.get(0) > 0);
Assert.assertTrue(wrCounts.get(1) > 0);
JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L);
Map<Integer, Long> worCounts = (Map<Integer, Long>) (Object) wor.countByKey();
- Assert.assertTrue(worCounts.size() == 2);
+ Assert.assertEquals(2, worCounts.size());
Assert.assertTrue(worCounts.get(0) > 0);
Assert.assertTrue(worCounts.get(1) > 0);
}
@@ -1641,7 +1638,7 @@ public class JavaAPISuite implements Serializable {
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<Integer, Integer>(i % 2, 1);
+ return new Tuple2<>(i % 2, 1);
}
});
Map<Integer, Object> fractions = Maps.newHashMap();
@@ -1649,25 +1646,25 @@ public class JavaAPISuite implements Serializable {
fractions.put(1, 1.0);
JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
Map<Integer, Long> wrExactCounts = (Map<Integer, Long>) (Object) wrExact.countByKey();
- Assert.assertTrue(wrExactCounts.size() == 2);
+ Assert.assertEquals(2, wrExactCounts.size());
Assert.assertTrue(wrExactCounts.get(0) == 2);
Assert.assertTrue(wrExactCounts.get(1) == 4);
JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
Map<Integer, Long> worExactCounts = (Map<Integer, Long>) (Object) worExact.countByKey();
- Assert.assertTrue(worExactCounts.size() == 2);
+ Assert.assertEquals(2, worExactCounts.size());
Assert.assertTrue(worExactCounts.get(0) == 2);
Assert.assertTrue(worExactCounts.get(1) == 4);
}
private static class SomeCustomClass implements Serializable {
- public SomeCustomClass() {
+ SomeCustomClass() {
// Intentionally left blank
}
}
@Test
public void collectUnderlyingScalaRDD() {
- List<SomeCustomClass> data = new ArrayList<SomeCustomClass>();
+ List<SomeCustomClass> data = new ArrayList<>();
for (int i = 0; i < 100; i++) {
data.add(new SomeCustomClass());
}
@@ -1679,7 +1676,7 @@ public class JavaAPISuite implements Serializable {
private static final class BuggyMapFunction<T> implements Function<T, T> {
@Override
- public T call(T x) throws Exception {
+ public T call(T x) {
throw new IllegalStateException("Custom exception!");
}
}
@@ -1716,7 +1713,7 @@ public class JavaAPISuite implements Serializable {
JavaFutureAction<Void> future = rdd.foreachAsync(
new VoidFunction<Integer>() {
@Override
- public void call(Integer integer) throws Exception {
+ public void call(Integer integer) {
// intentionally left blank.
}
}
@@ -1745,7 +1742,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(data, 1);
JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() {
@Override
- public void call(Integer integer) throws Exception {
+ public void call(Integer integer) throws InterruptedException {
Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled.
}
});
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 9db07d0507..fbdfbf7e50 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -75,11 +75,11 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);
- HashSet<String> sent = new HashSet<String>();
+ Set<String> sent = new HashSet<>();
sent.addAll(Arrays.asList(topic1data));
sent.addAll(Arrays.asList(topic2data));
- HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
kafkaParams.put("auto.offset.reset", "smallest");
@@ -95,17 +95,17 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
// Make sure you can get offset ranges from the rdd
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
- public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
+ public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
- Assert.assertEquals(offsets[0].topic(), topic1);
+ Assert.assertEquals(topic1, offsets[0].topic());
return rdd;
}
}
).map(
new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> kv) throws Exception {
+ public String call(Tuple2<String, String> kv) {
return kv._2();
}
}
@@ -119,10 +119,10 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
StringDecoder.class,
String.class,
kafkaParams,
- topicOffsetToMap(topic2, (long) 0),
+ topicOffsetToMap(topic2, 0L),
new Function<MessageAndMetadata<String, String>, String>() {
@Override
- public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
return msgAndMd.message();
}
}
@@ -133,7 +133,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
unifiedStream.foreachRDD(
new Function<JavaRDD<String>, Void>() {
@Override
- public Void call(JavaRDD<String> rdd) throws Exception {
+ public Void call(JavaRDD<String> rdd) {
result.addAll(rdd.collect());
for (OffsetRange o : offsetRanges.get()) {
System.out.println(
@@ -155,14 +155,14 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
ssc.stop();
}
- private HashSet<String> topicToSet(String topic) {
- HashSet<String> topicSet = new HashSet<String>();
+ private static Set<String> topicToSet(String topic) {
+ Set<String> topicSet = new HashSet<>();
topicSet.add(topic);
return topicSet;
}
- private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
- HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
+ private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
+ Map<TopicAndPartition, Long> topicMap = new HashMap<>();
topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
return topicMap;
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index a9dc6e5061..afcc6cfccd 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.Map;
import scala.Tuple2;
@@ -66,10 +67,10 @@ public class JavaKafkaRDDSuite implements Serializable {
String topic1 = "topic1";
String topic2 = "topic2";
- String[] topic1data = createTopicAndSendData(topic1);
- String[] topic2data = createTopicAndSendData(topic2);
+ createTopicAndSendData(topic1);
+ createTopicAndSendData(topic2);
- HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
OffsetRange[] offsetRanges = {
@@ -77,8 +78,8 @@ public class JavaKafkaRDDSuite implements Serializable {
OffsetRange.create(topic2, 0, 0, 1)
};
- HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>();
- HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>();
+ Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
+ Map<TopicAndPartition, Broker> leaders = new HashMap<>();
String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
leaders.put(new TopicAndPartition(topic1, 0), broker);
@@ -95,7 +96,7 @@ public class JavaKafkaRDDSuite implements Serializable {
).map(
new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> kv) throws Exception {
+ public String call(Tuple2<String, String> kv) {
return kv._2();
}
}
@@ -113,7 +114,7 @@ public class JavaKafkaRDDSuite implements Serializable {
emptyLeaders,
new Function<MessageAndMetadata<String, String>, String>() {
@Override
- public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
return msgAndMd.message();
}
}
@@ -131,7 +132,7 @@ public class JavaKafkaRDDSuite implements Serializable {
leaders,
new Function<MessageAndMetadata<String, String>, String>() {
@Override
- public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
return msgAndMd.message();
}
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index e4c659215b..1e69de46cd 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -67,10 +67,10 @@ public class JavaKafkaStreamSuite implements Serializable {
@Test
public void testKafkaStream() throws InterruptedException {
String topic = "topic1";
- HashMap<String, Integer> topics = new HashMap<String, Integer>();
+ Map<String, Integer> topics = new HashMap<>();
topics.put(topic, 1);
- HashMap<String, Integer> sent = new HashMap<String, Integer>();
+ Map<String, Integer> sent = new HashMap<>();
sent.put("a", 5);
sent.put("b", 3);
sent.put("c", 10);
@@ -78,7 +78,7 @@ public class JavaKafkaStreamSuite implements Serializable {
kafkaTestUtils.createTopic(topic);
kafkaTestUtils.sendMessages(topic, sent);
- HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");
@@ -97,7 +97,7 @@ public class JavaKafkaStreamSuite implements Serializable {
JavaDStream<String> words = stream.map(
new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> tuple2) throws Exception {
+ public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}
@@ -106,7 +106,7 @@ public class JavaKafkaStreamSuite implements Serializable {
words.countByValue().foreachRDD(
new Function<JavaPairRDD<String, Long>, Void>() {
@Override
- public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
+ public Void call(JavaPairRDD<String, Long> rdd) {
List<Tuple2<String, Long>> ret = rdd.collect();
for (Tuple2<String, Long> r : ret) {
if (result.containsKey(r._1())) {
@@ -130,8 +130,8 @@ public class JavaKafkaStreamSuite implements Serializable {
Thread.sleep(200);
}
Assert.assertEquals(sent.size(), result.size());
- for (String k : sent.keySet()) {
- Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
+ for (Map.Entry<String, Integer> e : sent.entrySet()) {
+ Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
}
}
}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index e46b4e5c75..26ec8af455 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -17,8 +17,6 @@
package org.apache.spark.streaming.twitter;
-import java.util.Arrays;
-
import org.junit.Test;
import twitter4j.Status;
import twitter4j.auth.Authorization;
@@ -30,7 +28,7 @@ import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
@Test
public void testTwitterStream() {
- String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
+ String[] filters = { "filter1", "filter2" };
Authorization auth = NullAuthorization.getInstance();
// tests the API, does not actually test data receiving
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index 729bc0459c..14975265ab 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -77,7 +77,7 @@ public class Java8APISuite implements Serializable {
public void foreach() {
foreachCalls = 0;
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach((x) -> foreachCalls++);
+ rdd.foreach(x -> foreachCalls++);
Assert.assertEquals(2, foreachCalls);
}
@@ -180,7 +180,7 @@ public class Java8APISuite implements Serializable {
JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
.cache();
pairs.collect();
- JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
+ JavaRDD<String> strings = rdd.map(Object::toString).cache();
strings.collect();
}
@@ -195,7 +195,9 @@ public class Java8APISuite implements Serializable {
JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
List<Tuple2<String, String>> pairs2 = new LinkedList<>();
- for (String word : s.split(" ")) pairs2.add(new Tuple2<>(word, word));
+ for (String word : s.split(" ")) {
+ pairs2.add(new Tuple2<>(word, word));
+ }
return pairs2;
});
@@ -204,11 +206,12 @@ public class Java8APISuite implements Serializable {
JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
List<Double> lengths = new LinkedList<>();
- for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+ for (String word : s.split(" ")) {
+ lengths.add((double) word.length());
+ }
return lengths;
});
- Double x = doubles.first();
Assert.assertEquals(5.0, doubles.first(), 0.01);
Assert.assertEquals(11, pairs.count());
}
@@ -228,7 +231,7 @@ public class Java8APISuite implements Serializable {
swapped.collect();
// There was never a bug here, but it's worth testing:
- pairRDD.map(item -> item.swap()).collect();
+ pairRDD.map(Tuple2::swap).collect();
}
@Test
@@ -282,11 +285,11 @@ public class Java8APISuite implements Serializable {
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
(Iterator<Integer> i, Iterator<String> s) -> {
int sizeI = 0;
- int sizeS = 0;
while (i.hasNext()) {
sizeI += 1;
i.next();
}
+ int sizeS = 0;
while (s.hasNext()) {
sizeS += 1;
s.next();
@@ -301,30 +304,31 @@ public class Java8APISuite implements Serializable {
public void accumulators() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- final Accumulator<Integer> intAccum = sc.intAccumulator(10);
- rdd.foreach(x -> intAccum.add(x));
+ Accumulator<Integer> intAccum = sc.intAccumulator(10);
+ rdd.foreach(intAccum::add);
Assert.assertEquals((Integer) 25, intAccum.value());
- final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+ Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
rdd.foreach(x -> doubleAccum.add((double) x));
Assert.assertEquals((Double) 25.0, doubleAccum.value());
// Try a custom accumulator type
AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+ @Override
public Float addInPlace(Float r, Float t) {
return r + t;
}
-
+ @Override
public Float addAccumulator(Float r, Float t) {
return r + t;
}
-
+ @Override
public Float zero(Float initialValue) {
return 0.0f;
}
};
- final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
+ Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
rdd.foreach(x -> floatAccum.add((float) x));
Assert.assertEquals((Float) 25.0f, floatAccum.value());
@@ -336,7 +340,7 @@ public class Java8APISuite implements Serializable {
@Test
public void keyBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
- List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
+ List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
}
@@ -349,7 +353,7 @@ public class Java8APISuite implements Serializable {
JavaPairRDD<Integer, Integer> rdd3 =
rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
Assert.assertEquals(Arrays.asList(
- new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<>(1, 1),
new Tuple2<>(0, 2),
new Tuple2<>(1, 3),
new Tuple2<>(0, 4)), rdd3.collect());
@@ -361,7 +365,7 @@ public class Java8APISuite implements Serializable {
JavaPairRDD<Integer, Integer> rdd2 =
rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
- List[] parts = rdd1.collectPartitions(new int[]{0});
+ List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
parts = rdd1.collectPartitions(new int[]{1, 2});
@@ -371,19 +375,19 @@ public class Java8APISuite implements Serializable {
Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
rdd2.collectPartitions(new int[]{0})[0]);
- parts = rdd2.collectPartitions(new int[]{1, 2});
- Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts[0]);
+ List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
- parts[1]);
+ parts2[1]);
}
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD =
rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
- Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
index bf693c7c39..7b50aad4ad 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
@@ -18,6 +18,7 @@
package test.org.apache.spark.sql;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -83,7 +84,7 @@ public class JavaApplySchemaSuite implements Serializable {
@Test
public void applySchema() {
- List<Person> personList = new ArrayList<Person>(2);
+ List<Person> personList = new ArrayList<>(2);
Person person1 = new Person();
person1.setName("Michael");
person1.setAge(29);
@@ -95,12 +96,13 @@ public class JavaApplySchemaSuite implements Serializable {
JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map(
new Function<Person, Row>() {
+ @Override
public Row call(Person person) throws Exception {
return RowFactory.create(person.getName(), person.getAge());
}
});
- List<StructField> fields = new ArrayList<StructField>(2);
+ List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("name", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType schema = DataTypes.createStructType(fields);
@@ -118,7 +120,7 @@ public class JavaApplySchemaSuite implements Serializable {
@Test
public void dataFrameRDDOperations() {
- List<Person> personList = new ArrayList<Person>(2);
+ List<Person> personList = new ArrayList<>(2);
Person person1 = new Person();
person1.setName("Michael");
person1.setAge(29);
@@ -129,27 +131,28 @@ public class JavaApplySchemaSuite implements Serializable {
personList.add(person2);
JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map(
- new Function<Person, Row>() {
- public Row call(Person person) throws Exception {
- return RowFactory.create(person.getName(), person.getAge());
- }
- });
-
- List<StructField> fields = new ArrayList<StructField>(2);
- fields.add(DataTypes.createStructField("name", DataTypes.StringType, false));
+ new Function<Person, Row>() {
+ @Override
+ public Row call(Person person) {
+ return RowFactory.create(person.getName(), person.getAge());
+ }
+ });
+
+ List<StructField> fields = new ArrayList<>(2);
+ fields.add(DataTypes.createStructField("", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.applySchema(rowRDD, schema);
df.registerTempTable("people");
List<String> actual = sqlContext.sql("SELECT * FROM people").toJavaRDD().map(new Function<Row, String>() {
-
+ @Override
public String call(Row row) {
- return row.getString(0) + "_" + row.get(1).toString();
+ return row.getString(0) + "_" + row.get(1);
}
}).collect();
- List<String> expected = new ArrayList<String>(2);
+ List<String> expected = new ArrayList<>(2);
expected.add("Michael_29");
expected.add("Yin_28");
@@ -165,7 +168,7 @@ public class JavaApplySchemaSuite implements Serializable {
"{\"string\":\"this is another simple string.\", \"integer\":11, \"long\":21474836469, " +
"\"bigInteger\":92233720368547758069, \"double\":1.7976931348623157E305, " +
"\"boolean\":false, \"null\":null}"));
- List<StructField> fields = new ArrayList<StructField>(7);
+ List<StructField> fields = new ArrayList<>(7);
fields.add(DataTypes.createStructField("bigInteger", DataTypes.createDecimalType(20, 0),
true));
fields.add(DataTypes.createStructField("boolean", DataTypes.BooleanType, true));
@@ -175,10 +178,10 @@ public class JavaApplySchemaSuite implements Serializable {
fields.add(DataTypes.createStructField("null", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("string", DataTypes.StringType, true));
StructType expectedSchema = DataTypes.createStructType(fields);
- List<Row> expectedResult = new ArrayList<Row>(2);
+ List<Row> expectedResult = new ArrayList<>(2);
expectedResult.add(
RowFactory.create(
- new java.math.BigDecimal("92233720368547758070"),
+ new BigDecimal("92233720368547758070"),
true,
1.7976931348623157E308,
10,
@@ -187,7 +190,7 @@ public class JavaApplySchemaSuite implements Serializable {
"this is a simple string."));
expectedResult.add(
RowFactory.create(
- new java.math.BigDecimal("92233720368547758069"),
+ new BigDecimal("92233720368547758069"),
false,
1.7976931348623157E305,
11,
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 4867cebf53..d981ce947f 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -61,7 +61,7 @@ public class JavaDataFrameSuite {
@Test
public void testExecution() {
DataFrame df = context.table("testData").filter("key = 1");
- Assert.assertEquals(df.select("key").collect()[0].get(0), 1);
+ Assert.assertEquals(1, df.select("key").collect()[0].get(0));
}
/**
@@ -119,7 +119,7 @@ public class JavaDataFrameSuite {
public static class Bean implements Serializable {
private double a = 0.0;
- private Integer[] b = new Integer[]{0, 1};
+ private Integer[] b = { 0, 1 };
private Map<String, int[]> c = ImmutableMap.of("hello", new int[] { 1, 2 });
private List<String> d = Arrays.asList("floppy", "disk");
@@ -161,7 +161,7 @@ public class JavaDataFrameSuite {
schema.apply("d"));
Row first = df.select("a", "b", "c", "d").first();
Assert.assertEquals(bean.getA(), first.getDouble(0), 0.0);
- // Now Java lists and maps are converetd to Scala Seq's and Map's. Once we get a Seq below,
+ // Now Java lists and maps are converted to Scala Seq's and Map's. Once we get a Seq below,
// verify that it has the expected length, and contains expected elements.
Seq<Integer> result = first.getAs(1);
Assert.assertEquals(bean.getB().length, result.length());
@@ -180,7 +180,8 @@ public class JavaDataFrameSuite {
}
}
- private static Comparator<Row> CrosstabRowComparator = new Comparator<Row>() {
+ private static final Comparator<Row> crosstabRowComparator = new Comparator<Row>() {
+ @Override
public int compare(Row row1, Row row2) {
String item1 = row1.getString(0);
String item2 = row2.getString(0);
@@ -193,16 +194,16 @@ public class JavaDataFrameSuite {
DataFrame df = context.table("testData2");
DataFrame crosstab = df.stat().crosstab("a", "b");
String[] columnNames = crosstab.schema().fieldNames();
- Assert.assertEquals(columnNames[0], "a_b");
- Assert.assertEquals(columnNames[1], "1");
- Assert.assertEquals(columnNames[2], "2");
+ Assert.assertEquals("a_b", columnNames[0]);
+ Assert.assertEquals("1", columnNames[1]);
+ Assert.assertEquals("2", columnNames[2]);
Row[] rows = crosstab.collect();
- Arrays.sort(rows, CrosstabRowComparator);
+ Arrays.sort(rows, crosstabRowComparator);
Integer count = 1;
for (Row row : rows) {
Assert.assertEquals(row.get(0).toString(), count.toString());
- Assert.assertEquals(row.getLong(1), 1L);
- Assert.assertEquals(row.getLong(2), 1L);
+ Assert.assertEquals(1L, row.getLong(1));
+ Assert.assertEquals(1L, row.getLong(2));
count++;
}
}
@@ -210,7 +211,7 @@ public class JavaDataFrameSuite {
@Test
public void testFrequentItems() {
DataFrame df = context.table("testData2");
- String[] cols = new String[]{"a"};
+ String[] cols = {"a"};
DataFrame results = df.stat().freqItems(cols, 0.2);
Assert.assertTrue(results.collect()[0].getSeq(0).contains(1));
}
@@ -219,14 +220,14 @@ public class JavaDataFrameSuite {
public void testCorrelation() {
DataFrame df = context.table("testData2");
Double pearsonCorr = df.stat().corr("a", "b", "pearson");
- Assert.assertTrue(Math.abs(pearsonCorr) < 1e-6);
+ Assert.assertTrue(Math.abs(pearsonCorr) < 1.0e-6);
}
@Test
public void testCovariance() {
DataFrame df = context.table("testData2");
Double result = df.stat().cov("a", "b");
- Assert.assertTrue(Math.abs(result) < 1e-6);
+ Assert.assertTrue(Math.abs(result) < 1.0e-6);
}
@Test
@@ -234,7 +235,7 @@ public class JavaDataFrameSuite {
DataFrame df = context.range(0, 100, 1, 2).select(col("id").mod(3).as("key"));
DataFrame sampled = df.stat().<Integer>sampleBy("key", ImmutableMap.of(0, 0.1, 1, 0.2), 0L);
Row[] actual = sampled.groupBy("key").count().orderBy("key").collect();
- Row[] expected = new Row[] {RowFactory.create(0, 5), RowFactory.create(1, 8)};
+ Row[] expected = {RowFactory.create(0, 5), RowFactory.create(1, 8)};
Assert.assertArrayEquals(expected, actual);
}
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java
index 4ce1d1dddb..3ab4db2a03 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaRowSuite.java
@@ -18,6 +18,7 @@
package test.org.apache.spark.sql;
import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
@@ -52,12 +53,12 @@ public class JavaRowSuite {
shortValue = (short)32767;
intValue = 2147483647;
longValue = 9223372036854775807L;
- floatValue = (float)3.4028235E38;
+ floatValue = 3.4028235E38f;
doubleValue = 1.7976931348623157E308;
decimalValue = new BigDecimal("1.7976931348623157E328");
booleanValue = true;
stringValue = "this is a string";
- binaryValue = stringValue.getBytes();
+ binaryValue = stringValue.getBytes(StandardCharsets.UTF_8);
dateValue = Date.valueOf("2014-06-30");
timestampValue = Timestamp.valueOf("2014-06-30 09:20:00.0");
}
@@ -123,8 +124,8 @@ public class JavaRowSuite {
Assert.assertEquals(binaryValue, simpleRow.get(16));
Assert.assertEquals(dateValue, simpleRow.get(17));
Assert.assertEquals(timestampValue, simpleRow.get(18));
- Assert.assertEquals(true, simpleRow.isNullAt(19));
- Assert.assertEquals(null, simpleRow.get(19));
+ Assert.assertTrue(simpleRow.isNullAt(19));
+ Assert.assertNull(simpleRow.get(19));
}
@Test
@@ -134,7 +135,7 @@ public class JavaRowSuite {
stringValue + " (1)", stringValue + " (2)", stringValue + "(3)");
// Simple map
- Map<String, Long> simpleMap = new HashMap<String, Long>();
+ Map<String, Long> simpleMap = new HashMap<>();
simpleMap.put(stringValue + " (1)", longValue);
simpleMap.put(stringValue + " (2)", longValue - 1);
simpleMap.put(stringValue + " (3)", longValue - 2);
@@ -149,7 +150,7 @@ public class JavaRowSuite {
List<Row> arrayOfRows = Arrays.asList(simpleStruct);
// Complex map
- Map<List<Row>, Row> complexMap = new HashMap<List<Row>, Row>();
+ Map<List<Row>, Row> complexMap = new HashMap<>();
complexMap.put(arrayOfRows, simpleStruct);
// Complex struct
@@ -167,7 +168,7 @@ public class JavaRowSuite {
Assert.assertEquals(arrayOfMaps, complexStruct.get(3));
Assert.assertEquals(arrayOfRows, complexStruct.get(4));
Assert.assertEquals(complexMap, complexStruct.get(5));
- Assert.assertEquals(null, complexStruct.get(6));
+ Assert.assertNull(complexStruct.get(6));
// A very complex row
Row complexRow = RowFactory.create(arrayOfMaps, arrayOfRows, complexMap, complexStruct);
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
index bb02b58cca..4a78dca7fe 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
@@ -20,6 +20,7 @@ package test.org.apache.spark.sql;
import java.io.Serializable;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -61,13 +62,13 @@ public class JavaUDFSuite implements Serializable {
sqlContext.udf().register("stringLengthTest", new UDF1<String, Integer>() {
@Override
- public Integer call(String str) throws Exception {
+ public Integer call(String str) {
return str.length();
}
}, DataTypes.IntegerType);
Row result = sqlContext.sql("SELECT stringLengthTest('test')").head();
- assert(result.getInt(0) == 4);
+ Assert.assertEquals(4, result.getInt(0));
}
@SuppressWarnings("unchecked")
@@ -81,12 +82,12 @@ public class JavaUDFSuite implements Serializable {
sqlContext.udf().register("stringLengthTest", new UDF2<String, String, Integer>() {
@Override
- public Integer call(String str1, String str2) throws Exception {
+ public Integer call(String str1, String str2) {
return str1.length() + str2.length();
}
}, DataTypes.IntegerType);
Row result = sqlContext.sql("SELECT stringLengthTest('test', 'test2')").head();
- assert(result.getInt(0) == 9);
+ Assert.assertEquals(9, result.getInt(0));
}
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
index 6f9e7f68dc..9e241f2098 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
@@ -44,7 +44,7 @@ public class JavaSaveLoadSuite {
File path;
DataFrame df;
- private void checkAnswer(DataFrame actual, List<Row> expected) {
+ private static void checkAnswer(DataFrame actual, List<Row> expected) {
String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected);
if (errorMessage != null) {
Assert.fail(errorMessage);
@@ -64,7 +64,7 @@ public class JavaSaveLoadSuite {
path.delete();
}
- List<String> jsonObjects = new ArrayList<String>(10);
+ List<String> jsonObjects = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
@@ -82,7 +82,7 @@ public class JavaSaveLoadSuite {
@Test
public void saveAndLoad() {
- Map<String, String> options = new HashMap<String, String>();
+ Map<String, String> options = new HashMap<>();
options.put("path", path.toString());
df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save();
DataFrame loadedDF = sqlContext.read().format("json").options(options).load();
@@ -91,11 +91,11 @@ public class JavaSaveLoadSuite {
@Test
public void saveAndLoadWithSchema() {
- Map<String, String> options = new HashMap<String, String>();
+ Map<String, String> options = new HashMap<>();
options.put("path", path.toString());
df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save();
- List<StructField> fields = new ArrayList<StructField>();
+ List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame loadedDF = sqlContext.read().format("json").schema(schema).options(options).load();
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
index 019d8a3026..b4bf9eef8f 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
@@ -40,7 +40,7 @@ public class JavaDataFrameSuite {
DataFrame df;
- private void checkAnswer(DataFrame actual, List<Row> expected) {
+ private static void checkAnswer(DataFrame actual, List<Row> expected) {
String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected);
if (errorMessage != null) {
Assert.fail(errorMessage);
@@ -52,7 +52,7 @@ public class JavaDataFrameSuite {
hc = TestHive$.MODULE$;
sc = new JavaSparkContext(hc.sparkContext());
- List<String> jsonObjects = new ArrayList<String>(10);
+ List<String> jsonObjects = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}");
}
@@ -71,7 +71,7 @@ public class JavaDataFrameSuite {
@Test
public void saveTableAndQueryIt() {
checkAnswer(
- df.select(functions.avg("key").over(
+ df.select(avg("key").over(
Window.partitionBy("value").orderBy("key").rowsBetween(-1, 1))),
hc.sql("SELECT avg(key) " +
"OVER (PARTITION BY value " +
@@ -95,7 +95,7 @@ public class JavaDataFrameSuite {
registeredUDAF.apply(col("value")),
callUDF("mydoublesum", col("value")));
- List<Row> expectedResult = new ArrayList<Row>();
+ List<Row> expectedResult = new ArrayList<>();
expectedResult.add(RowFactory.create(4950.0, 9900.0, 9900.0, 9900.0));
checkAnswer(
aggregatedDF,
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index 4192155975..c8d272794d 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -53,7 +53,7 @@ public class JavaMetastoreDataSourcesSuite {
FileSystem fs;
DataFrame df;
- private void checkAnswer(DataFrame actual, List<Row> expected) {
+ private static void checkAnswer(DataFrame actual, List<Row> expected) {
String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected);
if (errorMessage != null) {
Assert.fail(errorMessage);
@@ -77,7 +77,7 @@ public class JavaMetastoreDataSourcesSuite {
fs.delete(hiveManagedPath, true);
}
- List<String> jsonObjects = new ArrayList<String>(10);
+ List<String> jsonObjects = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
@@ -97,7 +97,7 @@ public class JavaMetastoreDataSourcesSuite {
@Test
public void saveExternalTableAndQueryIt() {
- Map<String, String> options = new HashMap<String, String>();
+ Map<String, String> options = new HashMap<>();
options.put("path", path.toString());
df.write()
.format("org.apache.spark.sql.json")
@@ -120,7 +120,7 @@ public class JavaMetastoreDataSourcesSuite {
@Test
public void saveExternalTableWithSchemaAndQueryIt() {
- Map<String, String> options = new HashMap<String, String>();
+ Map<String, String> options = new HashMap<>();
options.put("path", path.toString());
df.write()
.format("org.apache.spark.sql.json")
@@ -132,7 +132,7 @@ public class JavaMetastoreDataSourcesSuite {
sqlContext.sql("SELECT * FROM javaSavedTable"),
df.collectAsList());
- List<StructField> fields = new ArrayList<StructField>();
+ List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame loadedDF =
@@ -148,7 +148,7 @@ public class JavaMetastoreDataSourcesSuite {
@Test
public void saveTableAndQueryIt() {
- Map<String, String> options = new HashMap<String, String>();
+ Map<String, String> options = new HashMap<>();
df.write()
.format("org.apache.spark.sql.json")
.mode(SaveMode.Append)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index e0718f73aa..c521714922 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -18,24 +18,22 @@
package org.apache.spark.streaming;
import java.io.*;
-import java.lang.Iterable;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import scala.Tuple2;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import scala.Tuple2;
-
import org.junit.Assert;
-import static org.junit.Assert.*;
import org.junit.Test;
import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
@@ -54,14 +52,14 @@ import org.apache.spark.SparkConf;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
- public void equalIterator(Iterator<?> a, Iterator<?> b) {
+ public static void equalIterator(Iterator<?> a, Iterator<?> b) {
while (a.hasNext() && b.hasNext()) {
Assert.assertEquals(a.next(), b.next());
}
Assert.assertEquals(a.hasNext(), b.hasNext());
}
- public void equalIterable(Iterable<?> a, Iterable<?> b) {
+ public static void equalIterable(Iterable<?> a, Iterable<?> b) {
equalIterator(a.iterator(), b.iterator());
}
@@ -74,14 +72,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testContextState() {
List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
- Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+ Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaTestUtils.attachTestOutputStream(stream);
- Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+ Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
ssc.start();
- Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
+ Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
ssc.stop();
- Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
+ Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
}
@SuppressWarnings("unchecked")
@@ -118,7 +116,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
- public Integer call(String s) throws Exception {
+ public Integer call(String s) {
return s.length();
}
});
@@ -180,7 +178,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<String>> expected = Arrays.asList(
Arrays.asList("giants"),
@@ -189,7 +187,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
@Override
- public Boolean call(String s) throws Exception {
+ public Boolean call(String s) {
return s.contains("a");
}
});
@@ -243,11 +241,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testGlom() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<List<String>>> expected = Arrays.asList(
Arrays.asList(Arrays.asList("giants", "dodgers")),
- Arrays.asList(Arrays.asList("yankees", "red socks")));
+ Arrays.asList(Arrays.asList("yankees", "red sox")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<List<String>> glommed = stream.glom();
@@ -262,22 +260,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testMapPartitions() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<String>> expected = Arrays.asList(
Arrays.asList("GIANTSDODGERS"),
- Arrays.asList("YANKEESRED SOCKS"));
+ Arrays.asList("YANKEESRED SOX"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> mapped = stream.mapPartitions(
new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> in) {
- String out = "";
+ StringBuilder out = new StringBuilder();
while (in.hasNext()) {
- out = out + in.next().toUpperCase();
+ out.append(in.next().toUpperCase(Locale.ENGLISH));
}
- return Lists.newArrayList(out);
+ return Arrays.asList(out.toString());
}
});
JavaTestUtils.attachTestOutputStream(mapped);
@@ -286,16 +284,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
- private class IntegerSum implements Function2<Integer, Integer, Integer> {
+ private static class IntegerSum implements Function2<Integer, Integer, Integer> {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
- private class IntegerDifference implements Function2<Integer, Integer, Integer> {
+ private static class IntegerDifference implements Function2<Integer, Integer, Integer> {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 - i2;
}
}
@@ -347,13 +345,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = null;
+ JavaDStream<Integer> reducedWindowed;
if (withInverse) {
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
- new IntegerDifference(), new Duration(2000), new Duration(1000));
+ new IntegerDifference(), new Duration(2000), new Duration(1000));
} else {
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
}
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -378,11 +376,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(7,8,9));
JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
- JavaRDD<Integer> rdd1 = ssc.sparkContext().parallelize(Arrays.asList(1, 2, 3));
- JavaRDD<Integer> rdd2 = ssc.sparkContext().parallelize(Arrays.asList(4, 5, 6));
- JavaRDD<Integer> rdd3 = ssc.sparkContext().parallelize(Arrays.asList(7,8,9));
+ JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3));
+ JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6));
+ JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9));
- LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
+ Queue<JavaRDD<Integer>> rdds = new LinkedList<>();
rdds.add(rdd1);
rdds.add(rdd2);
rdds.add(rdd3);
@@ -410,10 +408,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> transformed = stream.transform(
new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) {
return in.map(new Function<Integer, Integer>() {
@Override
- public Integer call(Integer i) throws Exception {
+ public Integer call(Integer i) {
return i + 2;
}
});
@@ -435,70 +433,70 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
List<List<Tuple2<String, Integer>>> pairInputData =
- Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
- JavaDStream<Integer> transformed1 = stream.transform(
+ stream.transform(
new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) {
return null;
}
}
);
- JavaDStream<Integer> transformed2 = stream.transform(
+ stream.transform(
new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) {
return null;
}
}
);
- JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(
+ stream.transformToPair(
new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) {
return null;
}
}
);
- JavaPairDStream<String, Integer> transformed4 = stream.transformToPair(
+ stream.transformToPair(
new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) {
return null;
}
}
);
- JavaDStream<Integer> pairTransformed1 = pairStream.transform(
+ pairStream.transform(
new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) {
return null;
}
}
);
- JavaDStream<Integer> pairTransformed2 = pairStream.transform(
+ pairStream.transform(
new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) {
return null;
}
}
);
- JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(
+ pairStream.transformToPair(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
- @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) {
return null;
}
}
);
- JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair(
+ pairStream.transformToPair(
new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
- @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) {
return null;
}
}
@@ -511,32 +509,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(
- new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
+ new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
Arrays.asList(
- new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("new york", "rangers")));
+ new Tuple2<>("california", "sharks"),
+ new Tuple2<>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
Arrays.asList(
- new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "mets")),
+ new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "mets")),
Arrays.asList(
- new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "islanders")));
+ new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "islanders")));
List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Sets.newHashSet(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("dodgers", "giants")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("yankees", "mets"))),
+ new Tuple2<>("california",
+ new Tuple2<>("dodgers", "giants")),
+ new Tuple2<>("new york",
+ new Tuple2<>("yankees", "mets"))),
Sets.newHashSet(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("sharks", "ducks")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("rangers", "islanders"))));
+ new Tuple2<>("california",
+ new Tuple2<>("sharks", "ducks")),
+ new Tuple2<>("new york",
+ new Tuple2<>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream1, 1);
@@ -552,14 +550,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairRDD<String, String>,
JavaPairRDD<String, String>,
Time,
- JavaPairRDD<String, Tuple2<String, String>>
- >() {
+ JavaPairRDD<String, Tuple2<String, String>>>() {
@Override
public JavaPairRDD<String, Tuple2<String, String>> call(
JavaPairRDD<String, String> rdd1,
JavaPairRDD<String, String> rdd2,
- Time time
- ) throws Exception {
+ Time time) {
return rdd1.join(rdd2);
}
}
@@ -567,9 +563,9 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
- unorderedResult.add(Sets.newHashSet(res));
+ unorderedResult.add(Sets.newHashSet(res));
}
Assert.assertEquals(expected, unorderedResult);
@@ -587,89 +583,89 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
List<List<Tuple2<String, Integer>>> pairInputData1 =
- Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
List<List<Tuple2<Double, Character>>> pairInputData2 =
- Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+ Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
- JavaDStream<Double> transformed1 = stream1.transformWith(
+ stream1.transformWith(
stream2,
new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
@Override
- public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) {
return null;
}
}
);
- JavaDStream<Double> transformed2 = stream1.transformWith(
+ stream1.transformWith(
pairStream1,
new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
@Override
- public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
return null;
}
}
);
- JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair(
+ stream1.transformWithToPair(
stream2,
new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
- public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) {
return null;
}
}
);
- JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair(
+ stream1.transformWithToPair(
pairStream1,
new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
@Override
- public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
return null;
}
}
);
- JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(
+ pairStream1.transformWith(
stream2,
new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
@Override
- public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) {
return null;
}
}
);
- JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith(
+ pairStream1.transformWith(
pairStream1,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
@Override
- public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
return null;
}
}
);
- JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair(
+ pairStream1.transformWithToPair(
stream2,
new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
- public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) {
return null;
}
}
);
- JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair(
+ pairStream1.transformWithToPair(
pairStream2,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
@Override
- public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception {
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) {
return null;
}
}
@@ -690,13 +686,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
);
List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
- Arrays.asList(new Tuple2<Integer, String>(1, "x")),
- Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+ Arrays.asList(new Tuple2<>(1, "x")),
+ Arrays.asList(new Tuple2<>(2, "y"))
);
List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
- Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+ Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
+ Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
);
JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
@@ -707,7 +703,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
// This is just to test whether this transform to JavaStream compiles
- JavaDStream<Long> transformed1 = ssc.transform(
+ ssc.transform(
listOfDStreams1,
new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
@Override
@@ -733,8 +729,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() {
@Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
- return new Tuple2<Integer, Integer>(i, i);
+ public Tuple2<Integer, Integer> call(Integer i) {
+ return new Tuple2<>(i, i);
}
};
return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
@@ -763,7 +759,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split("(?!^)"));
+ return Arrays.asList(x.split("(?!^)"));
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -782,39 +778,39 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(6, "g"),
- new Tuple2<Integer, String>(6, "i"),
- new Tuple2<Integer, String>(6, "a"),
- new Tuple2<Integer, String>(6, "n"),
- new Tuple2<Integer, String>(6, "t"),
- new Tuple2<Integer, String>(6, "s")),
+ new Tuple2<>(6, "g"),
+ new Tuple2<>(6, "i"),
+ new Tuple2<>(6, "a"),
+ new Tuple2<>(6, "n"),
+ new Tuple2<>(6, "t"),
+ new Tuple2<>(6, "s")),
Arrays.asList(
- new Tuple2<Integer, String>(7, "d"),
- new Tuple2<Integer, String>(7, "o"),
- new Tuple2<Integer, String>(7, "d"),
- new Tuple2<Integer, String>(7, "g"),
- new Tuple2<Integer, String>(7, "e"),
- new Tuple2<Integer, String>(7, "r"),
- new Tuple2<Integer, String>(7, "s")),
+ new Tuple2<>(7, "d"),
+ new Tuple2<>(7, "o"),
+ new Tuple2<>(7, "d"),
+ new Tuple2<>(7, "g"),
+ new Tuple2<>(7, "e"),
+ new Tuple2<>(7, "r"),
+ new Tuple2<>(7, "s")),
Arrays.asList(
- new Tuple2<Integer, String>(9, "a"),
- new Tuple2<Integer, String>(9, "t"),
- new Tuple2<Integer, String>(9, "h"),
- new Tuple2<Integer, String>(9, "l"),
- new Tuple2<Integer, String>(9, "e"),
- new Tuple2<Integer, String>(9, "t"),
- new Tuple2<Integer, String>(9, "i"),
- new Tuple2<Integer, String>(9, "c"),
- new Tuple2<Integer, String>(9, "s")));
+ new Tuple2<>(9, "a"),
+ new Tuple2<>(9, "t"),
+ new Tuple2<>(9, "h"),
+ new Tuple2<>(9, "l"),
+ new Tuple2<>(9, "e"),
+ new Tuple2<>(9, "t"),
+ new Tuple2<>(9, "i"),
+ new Tuple2<>(9, "c"),
+ new Tuple2<>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
new PairFlatMapFunction<String, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ public Iterable<Tuple2<Integer, String>> call(String in) {
+ List<Tuple2<Integer, String>> out = new ArrayList<>();
for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(in.length(), letter));
+ out.add(new Tuple2<>(in.length(), letter));
}
return out;
}
@@ -859,13 +855,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
*/
public static <T> void assertOrderInvariantEquals(
List<List<T>> expected, List<List<T>> actual) {
- List<Set<T>> expectedSets = new ArrayList<Set<T>>();
+ List<Set<T>> expectedSets = new ArrayList<>();
for (List<T> list: expected) {
- expectedSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
+ expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
}
- List<Set<T>> actualSets = new ArrayList<Set<T>>();
+ List<Set<T>> actualSets = new ArrayList<>();
for (List<T> list: actual) {
- actualSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
+ actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
}
Assert.assertEquals(expectedSets, actualSets);
}
@@ -877,25 +873,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testPairFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red socks"));
+ Arrays.asList("yankees", "red sox"));
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
- Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+ Arrays.asList(new Tuple2<>("giants", 6)),
+ Arrays.asList(new Tuple2<>("yankees", 7)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2<String, Integer> call(String in) throws Exception {
- return new Tuple2<String, Integer>(in, in.length());
+ public Tuple2<String, Integer> call(String in) {
+ return new Tuple2<>(in, in.length());
}
});
JavaPairDStream<String, Integer> filtered = pairStream.filter(
new Function<Tuple2<String, Integer>, Boolean>() {
@Override
- public Boolean call(Tuple2<String, Integer> in) throws Exception {
+ public Boolean call(Tuple2<String, Integer> in) {
return in._1().contains("a");
}
});
@@ -906,28 +902,28 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
@SuppressWarnings("unchecked")
- private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "yankees"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "rangers"),
- new Tuple2<String, String>("new york", "islanders")));
+ private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "yankees"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(new Tuple2<>("california", "sharks"),
+ new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "rangers"),
+ new Tuple2<>("new york", "islanders")));
@SuppressWarnings("unchecked")
- private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 1),
- new Tuple2<String, Integer>("california", 3),
- new Tuple2<String, Integer>("new york", 4),
- new Tuple2<String, Integer>("new york", 1)),
+ new Tuple2<>("california", 1),
+ new Tuple2<>("california", 3),
+ new Tuple2<>("new york", 4),
+ new Tuple2<>("new york", 1)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("new york", 3),
- new Tuple2<String, Integer>("new york", 1)));
+ new Tuple2<>("california", 5),
+ new Tuple2<>("california", 5),
+ new Tuple2<>("new york", 3),
+ new Tuple2<>("new york", 1)));
@SuppressWarnings("unchecked")
@Test
@@ -936,22 +932,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "california"),
- new Tuple2<Integer, String>(3, "california"),
- new Tuple2<Integer, String>(4, "new york"),
- new Tuple2<Integer, String>(1, "new york")),
+ new Tuple2<>(1, "california"),
+ new Tuple2<>(3, "california"),
+ new Tuple2<>(4, "new york"),
+ new Tuple2<>(1, "new york")),
Arrays.asList(
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(3, "new york"),
- new Tuple2<Integer, String>(1, "new york")));
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(3, "new york"),
+ new Tuple2<>(1, "new york")));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
- public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+ public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
return in.swap();
}
});
@@ -969,23 +965,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "california"),
- new Tuple2<Integer, String>(3, "california"),
- new Tuple2<Integer, String>(4, "new york"),
- new Tuple2<Integer, String>(1, "new york")),
+ new Tuple2<>(1, "california"),
+ new Tuple2<>(3, "california"),
+ new Tuple2<>(4, "new york"),
+ new Tuple2<>(1, "new york")),
Arrays.asList(
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(5, "california"),
- new Tuple2<Integer, String>(3, "new york"),
- new Tuple2<Integer, String>(1, "new york")));
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(3, "new york"),
+ new Tuple2<>(1, "new york")));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
- LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
+ List<Tuple2<Integer, String>> out = new LinkedList<>();
while (in.hasNext()) {
Tuple2<String, Integer> next = in.next();
out.add(next.swap());
@@ -1014,7 +1010,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> reversed = pairStream.map(
new Function<Tuple2<String, Integer>, Integer>() {
@Override
- public Integer call(Tuple2<String, Integer> in) throws Exception {
+ public Integer call(Tuple2<String, Integer> in) {
return in._2();
}
});
@@ -1030,23 +1026,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("hi", 1),
- new Tuple2<String, Integer>("ho", 2)),
+ new Tuple2<>("hi", 1),
+ new Tuple2<>("ho", 2)),
Arrays.asList(
- new Tuple2<String, Integer>("hi", 1),
- new Tuple2<String, Integer>("ho", 2)));
+ new Tuple2<>("hi", 1),
+ new Tuple2<>("ho", 2)));
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, String>(1, "h"),
- new Tuple2<Integer, String>(1, "i"),
- new Tuple2<Integer, String>(2, "h"),
- new Tuple2<Integer, String>(2, "o")),
+ new Tuple2<>(1, "h"),
+ new Tuple2<>(1, "i"),
+ new Tuple2<>(2, "h"),
+ new Tuple2<>(2, "o")),
Arrays.asList(
- new Tuple2<Integer, String>(1, "h"),
- new Tuple2<Integer, String>(1, "i"),
- new Tuple2<Integer, String>(2, "h"),
- new Tuple2<Integer, String>(2, "o")));
+ new Tuple2<>(1, "h"),
+ new Tuple2<>(1, "i"),
+ new Tuple2<>(2, "h"),
+ new Tuple2<>(2, "o")));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -1054,10 +1050,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
- List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
+ List<Tuple2<Integer, String>> out = new LinkedList<>();
for (Character s : in._1().toCharArray()) {
- out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+ out.add(new Tuple2<>(in._2(), s.toString()));
}
return out;
}
@@ -1075,11 +1071,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+ new Tuple2<>("california", Arrays.asList("dodgers", "giants")),
+ new Tuple2<>("new york", Arrays.asList("yankees", "mets"))),
Arrays.asList(
- new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+ new Tuple2<>("california", Arrays.asList("sharks", "ducks")),
+ new Tuple2<>("new york", Arrays.asList("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1111,11 +1107,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
+ new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1136,20 +1132,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
+ new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
Arrays.asList(
- new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
+ JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
new Function<Integer, Integer>() {
@Override
- public Integer call(Integer i) throws Exception {
+ public Integer call(Integer i) {
return i;
}
}, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
@@ -1170,13 +1166,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("world", 1L)),
+ new Tuple2<>("hello", 1L),
+ new Tuple2<>("world", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("moon", 1L)),
+ new Tuple2<>("hello", 1L),
+ new Tuple2<>("moon", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("hello", 1L)));
+ new Tuple2<>("hello", 1L)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Long> counted = stream.countByValue();
@@ -1193,16 +1189,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
- new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+ new Tuple2<>("california", Arrays.asList(1, 3)),
+ new Tuple2<>("new york", Arrays.asList(1, 4))
),
Arrays.asList(
- new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
- new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+ new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)),
+ new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4))
),
Arrays.asList(
- new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
- new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+ new Tuple2<>("california", Arrays.asList(5, 5)),
+ new Tuple2<>("new york", Arrays.asList(1, 3))
)
);
@@ -1220,16 +1216,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
}
- private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
- List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+ private static Set<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+ List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>();
for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
newListOfTuples.add(convert(tuple));
}
- return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+ return new HashSet<>(newListOfTuples);
}
- private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
- return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
+ private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+ return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2()));
}
@SuppressWarnings("unchecked")
@@ -1238,12 +1234,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1262,12 +1258,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1278,10 +1274,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
if (state.isPresent()) {
- out = out + state.get();
+ out += state.get();
}
for (Integer v : values) {
- out = out + v;
+ out += v;
}
return Optional.of(out);
}
@@ -1298,19 +1294,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<Tuple2<String, Integer>> initial = Arrays.asList (
- new Tuple2<String, Integer> ("california", 1),
- new Tuple2<String, Integer> ("new york", 2));
+ new Tuple2<>("california", 1),
+ new Tuple2<>("new york", 2));
JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial);
JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD (tmpRDD);
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 5),
- new Tuple2<String, Integer>("new york", 7)),
- Arrays.asList(new Tuple2<String, Integer>("california", 15),
- new Tuple2<String, Integer>("new york", 11)),
- Arrays.asList(new Tuple2<String, Integer>("california", 15),
- new Tuple2<String, Integer>("new york", 11)));
+ Arrays.asList(new Tuple2<>("california", 5),
+ new Tuple2<>("new york", 7)),
+ Arrays.asList(new Tuple2<>("california", 15),
+ new Tuple2<>("new york", 11)),
+ Arrays.asList(new Tuple2<>("california", 15),
+ new Tuple2<>("new york", 11)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1321,10 +1317,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
if (state.isPresent()) {
- out = out + state.get();
+ out += state.get();
}
for (Integer v : values) {
- out = out + v;
+ out += v;
}
return Optional.of(out);
}
@@ -1341,19 +1337,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, Integer>("california", 4),
- new Tuple2<String, Integer>("new york", 5)),
- Arrays.asList(new Tuple2<String, Integer>("california", 14),
- new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1370,15 +1366,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
Sets.newHashSet(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("world", 1L)),
+ new Tuple2<>("hello", 1L),
+ new Tuple2<>("world", 1L)),
Sets.newHashSet(
- new Tuple2<String, Long>("hello", 2L),
- new Tuple2<String, Long>("world", 1L),
- new Tuple2<String, Long>("moon", 1L)),
+ new Tuple2<>("hello", 2L),
+ new Tuple2<>("world", 1L),
+ new Tuple2<>("moon", 1L)),
Sets.newHashSet(
- new Tuple2<String, Long>("hello", 2L),
- new Tuple2<String, Long>("moon", 1L)));
+ new Tuple2<>("hello", 2L),
+ new Tuple2<>("moon", 1L)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1386,7 +1382,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList();
+ List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>();
for (List<Tuple2<String, Long>> res: result) {
unorderedResult.add(Sets.newHashSet(res));
}
@@ -1399,27 +1395,27 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testPairTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(2, 5)),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(2, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(1, 5)));
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(1, 5)));
List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5)),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5)));
+ new Tuple2<>(1, 5),
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5)));
JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1428,7 +1424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
@Override
- public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) {
return in.sortByKey();
}
});
@@ -1444,15 +1440,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testPairToNormalRDDTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
Arrays.asList(
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(1, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(2, 5)),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(2, 5)),
Arrays.asList(
- new Tuple2<Integer, Integer>(2, 5),
- new Tuple2<Integer, Integer>(3, 5),
- new Tuple2<Integer, Integer>(4, 5),
- new Tuple2<Integer, Integer>(1, 5)));
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(1, 5)));
List<List<Integer>> expected = Arrays.asList(
Arrays.asList(3,1,4,2),
@@ -1465,11 +1461,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> firstParts = pairStream.transform(
new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
@Override
- public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) {
return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
@Override
- public Integer call(Tuple2<Integer, Integer> in) {
- return in._1();
+ public Integer call(Tuple2<Integer, Integer> in2) {
+ return in2._1();
}
});
}
@@ -1487,14 +1483,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
- new Tuple2<String, String>("california", "GIANTS"),
- new Tuple2<String, String>("new york", "YANKEES"),
- new Tuple2<String, String>("new york", "METS")),
- Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
- new Tuple2<String, String>("california", "DUCKS"),
- new Tuple2<String, String>("new york", "RANGERS"),
- new Tuple2<String, String>("new york", "ISLANDERS")));
+ Arrays.asList(new Tuple2<>("california", "DODGERS"),
+ new Tuple2<>("california", "GIANTS"),
+ new Tuple2<>("new york", "YANKEES"),
+ new Tuple2<>("new york", "METS")),
+ Arrays.asList(new Tuple2<>("california", "SHARKS"),
+ new Tuple2<>("california", "DUCKS"),
+ new Tuple2<>("new york", "RANGERS"),
+ new Tuple2<>("new york", "ISLANDERS")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1502,8 +1498,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
@Override
- public String call(String s) throws Exception {
- return s.toUpperCase();
+ public String call(String s) {
+ return s.toUpperCase(Locale.ENGLISH);
}
});
@@ -1519,22 +1515,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
- new Tuple2<String, String>("california", "dodgers2"),
- new Tuple2<String, String>("california", "giants1"),
- new Tuple2<String, String>("california", "giants2"),
- new Tuple2<String, String>("new york", "yankees1"),
- new Tuple2<String, String>("new york", "yankees2"),
- new Tuple2<String, String>("new york", "mets1"),
- new Tuple2<String, String>("new york", "mets2")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
- new Tuple2<String, String>("california", "sharks2"),
- new Tuple2<String, String>("california", "ducks1"),
- new Tuple2<String, String>("california", "ducks2"),
- new Tuple2<String, String>("new york", "rangers1"),
- new Tuple2<String, String>("new york", "rangers2"),
- new Tuple2<String, String>("new york", "islanders1"),
- new Tuple2<String, String>("new york", "islanders2")));
+ Arrays.asList(new Tuple2<>("california", "dodgers1"),
+ new Tuple2<>("california", "dodgers2"),
+ new Tuple2<>("california", "giants1"),
+ new Tuple2<>("california", "giants2"),
+ new Tuple2<>("new york", "yankees1"),
+ new Tuple2<>("new york", "yankees2"),
+ new Tuple2<>("new york", "mets1"),
+ new Tuple2<>("new york", "mets2")),
+ Arrays.asList(new Tuple2<>("california", "sharks1"),
+ new Tuple2<>("california", "sharks2"),
+ new Tuple2<>("california", "ducks1"),
+ new Tuple2<>("california", "ducks2"),
+ new Tuple2<>("new york", "rangers1"),
+ new Tuple2<>("new york", "rangers2"),
+ new Tuple2<>("new york", "islanders1"),
+ new Tuple2<>("new york", "islanders2")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
@@ -1545,7 +1541,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Function<String, Iterable<String>>() {
@Override
public Iterable<String> call(String in) {
- List<String> out = new ArrayList<String>();
+ List<String> out = new ArrayList<>();
out.add(in + "1");
out.add(in + "2");
return out;
@@ -1562,29 +1558,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testCoGroup() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("new york", "rangers")));
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
+ Arrays.asList(new Tuple2<>("california", "sharks"),
+ new Tuple2<>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "islanders")));
+ Arrays.asList(new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
- new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
- new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
- new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))),
+ new Tuple2<>("california",
+ new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
+ new Tuple2<>("new york",
+ new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))),
Arrays.asList(
- new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
- new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
- new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
- new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
+ new Tuple2<>("california",
+ new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
+ new Tuple2<>("new york",
+ new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
@@ -1620,29 +1616,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("new york", "rangers")));
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
+ Arrays.asList(new Tuple2<>("california", "sharks"),
+ new Tuple2<>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "islanders")));
+ Arrays.asList(new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("dodgers", "giants")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("yankees", "mets"))),
+ new Tuple2<>("california",
+ new Tuple2<>("dodgers", "giants")),
+ new Tuple2<>("new york",
+ new Tuple2<>("yankees", "mets"))),
Arrays.asList(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("sharks", "ducks")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("rangers", "islanders"))));
+ new Tuple2<>("california",
+ new Tuple2<>("sharks", "ducks")),
+ new Tuple2<>("new york",
+ new Tuple2<>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
@@ -1664,13 +1660,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testLeftOuterJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks") ));
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
+ Arrays.asList(new Tuple2<>("california", "sharks") ));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "giants") ),
- Arrays.asList(new Tuple2<String, String>("new york", "islanders") )
+ Arrays.asList(new Tuple2<>("california", "giants") ),
+ Arrays.asList(new Tuple2<>("new york", "islanders") )
);
@@ -1713,7 +1709,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
- public Integer call(String s) throws Exception {
+ public Integer call(String s) {
return s.length();
}
});
@@ -1752,6 +1748,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// (used to detect the new context)
final AtomicBoolean newContextCreated = new AtomicBoolean(false);
Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
+ @Override
public JavaStreamingContext call() {
newContextCreated.set(true);
return new JavaStreamingContext(conf, Seconds.apply(1));
@@ -1765,20 +1762,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration(), true);
+ new Configuration(), true);
Assert.assertTrue("new context not created", newContextCreated.get());
ssc.stop();
newContextCreated.set(false);
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration());
+ new Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
newContextCreated.set(false);
JavaSparkContext sc = new JavaSparkContext(conf);
ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration());
+ new Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
}
@@ -1800,7 +1797,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
- public Integer call(String s) throws Exception {
+ public Integer call(String s) {
return s.length();
}
});
@@ -1818,29 +1815,26 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// InputStream functionality is deferred to the existing Scala tests.
@Test
public void testSocketTextStream() {
- JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345);
+ ssc.socketTextStream("localhost", 12345);
}
@Test
public void testSocketString() {
-
- class Converter implements Function<InputStream, Iterable<String>> {
- public Iterable<String> call(InputStream in) throws IOException {
- BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- List<String> out = new ArrayList<String>();
- while (true) {
- String line = reader.readLine();
- if (line == null) { break; }
- out.add(line);
- }
- return out;
- }
- }
-
- JavaDStream<String> test = ssc.socketStream(
+ ssc.socketStream(
"localhost",
12345,
- new Converter(),
+ new Function<InputStream, Iterable<String>>() {
+ @Override
+ public Iterable<String> call(InputStream in) throws IOException {
+ List<String> out = new ArrayList<>();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+ for (String line; (line = reader.readLine()) != null;) {
+ out.add(line);
+ }
+ }
+ return out;
+ }
+ },
StorageLevel.MEMORY_ONLY());
}
@@ -1870,7 +1864,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
TextInputFormat.class,
new Function<Path, Boolean>() {
@Override
- public Boolean call(Path v1) throws Exception {
+ public Boolean call(Path v1) {
return Boolean.TRUE;
}
},
@@ -1879,7 +1873,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> test = inputStream.map(
new Function<Tuple2<LongWritable, Text>, String>() {
@Override
- public String call(Tuple2<LongWritable, Text> v1) throws Exception {
+ public String call(Tuple2<LongWritable, Text> v1) {
return v1._2().toString();
}
});
@@ -1892,19 +1886,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testRawSocketStream() {
- JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
+ ssc.rawSocketStream("localhost", 12345);
}
- private List<List<String>> fileTestPrepare(File testDir) throws IOException {
+ private static List<List<String>> fileTestPrepare(File testDir) throws IOException {
File existingFile = new File(testDir, "0");
Files.write("0\n", existingFile, Charset.forName("UTF-8"));
- assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000);
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("0")
- );
-
- return expected;
+ Assert.assertTrue(existingFile.setLastModified(1000));
+ Assert.assertEquals(1000, existingFile.lastModified());
+ return Arrays.asList(Arrays.asList("0"));
}
@SuppressWarnings("unchecked")
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 1b0787fe69..ec2bffd6a5 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -36,7 +36,6 @@ import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.Socket;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class JavaReceiverAPISuite implements Serializable {
@@ -64,16 +63,16 @@ public class JavaReceiverAPISuite implements Serializable {
ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
JavaDStream<String> mapped = input.map(new Function<String, String>() {
@Override
- public String call(String v1) throws Exception {
+ public String call(String v1) {
return v1 + ".";
}
});
mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
- public Void call(JavaRDD<String> rdd) throws Exception {
- long count = rdd.count();
- dataCounter.addAndGet(count);
- return null;
+ public Void call(JavaRDD<String> rdd) {
+ long count = rdd.count();
+ dataCounter.addAndGet(count);
+ return null;
}
});
@@ -83,7 +82,7 @@ public class JavaReceiverAPISuite implements Serializable {
Thread.sleep(200);
for (int i = 0; i < 6; i++) {
- server.send("" + i + "\n"); // \n to make sure these are separate lines
+ server.send(i + "\n"); // \n to make sure these are separate lines
Thread.sleep(100);
}
while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) {
@@ -95,50 +94,49 @@ public class JavaReceiverAPISuite implements Serializable {
server.stop();
}
}
-}
-class JavaSocketReceiver extends Receiver<String> {
+ private static class JavaSocketReceiver extends Receiver<String> {
- String host = null;
- int port = -1;
+ String host = null;
+ int port = -1;
- public JavaSocketReceiver(String host_ , int port_) {
- super(StorageLevel.MEMORY_AND_DISK());
- host = host_;
- port = port_;
- }
+ JavaSocketReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK());
+ host = host_;
+ port = port_;
+ }
- @Override
- public void onStart() {
- new Thread() {
- @Override public void run() {
- receive();
- }
- }.start();
- }
+ @Override
+ public void onStart() {
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
- @Override
- public void onStop() {
- }
+ @Override
+ public void onStop() {
+ }
- private void receive() {
- Socket socket = null;
- try {
- socket = new Socket(host, port);
- BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- String userInput;
- while ((userInput = in.readLine()) != null) {
- store(userInput);
+ private void receive() {
+ try {
+ Socket socket = new Socket(host, port);
+ BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ String userInput;
+ while ((userInput = in.readLine()) != null) {
+ store(userInput);
+ }
+ in.close();
+ socket.close();
+ } catch(ConnectException ce) {
+ ce.printStackTrace();
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ t.printStackTrace();
+ restart("Error receiving data", t);
}
- in.close();
- socket.close();
- } catch(ConnectException ce) {
- ce.printStackTrace();
- restart("Could not connect", ce);
- } catch(Throwable t) {
- t.printStackTrace();
- restart("Error receiving data", t);
}
}
-}
+}