aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-08 17:47:44 +0000
committerSean Owen <sowen@cloudera.com>2016-01-08 17:47:44 +0000
commitb9c835337880f57fe8b953962913bcc524162348 (patch)
tree5dc476b1a65d513210d1124db144aaa3c5f66679 /streaming/src/test/java/org/apache
parent794ea553bd0fcfece15b610b47ee86d6644134c9 (diff)
downloadspark-b9c835337880f57fe8b953962913bcc524162348.tar.gz
spark-b9c835337880f57fe8b953962913bcc524162348.tar.bz2
spark-b9c835337880f57fe8b953962913bcc524162348.zip
[SPARK-12618][CORE][STREAMING][SQL] Clean up build warnings: 2.0.0 edition
Fix most build warnings: mostly deprecated API usages. I'll annotate some of the changes below. CC rxin who is leading the charge to remove the deprecated APIs. Author: Sean Owen <sowen@cloudera.com> Closes #10570 from srowen/SPARK-12618.
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java64
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java14
3 files changed, 33 insertions, 49 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 9722c60bba..ddc56fc869 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -772,8 +772,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
public void testForeachRDD() {
- final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
- final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,1,1),
Arrays.asList(1,1,1));
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index bc4bc2eb42..20e2a1c3d5 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -18,6 +18,7 @@
package org.apache.spark.streaming;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -26,10 +27,10 @@ import java.util.Set;
import scala.Tuple2;
import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.util.ManualClock;
import org.junit.Assert;
@@ -51,10 +52,8 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaPairRDD<String, Boolean> initialRDD = null;
JavaPairDStream<String, Integer> wordsDstream = null;
- final Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>
- mappingFunc =
+ Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc =
new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() {
-
@Override
public Optional<Double> call(
Time time, String word, Optional<Integer> one, State<Boolean> state) {
@@ -76,11 +75,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
.partitioner(new HashPartitioner(10))
.timeout(Durations.seconds(10)));
- JavaPairDStream<String, Boolean> stateSnapshots = stateDstream.stateSnapshots();
+ stateDstream.stateSnapshots();
- final Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
+ Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
-
@Override
public Double call(String key, Optional<Integer> one, State<Boolean> state) {
// Use all State's methods here
@@ -95,13 +93,13 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function(mappingFunc2)
+ StateSpec.function(mappingFunc2)
.initialState(initialRDD)
.numPartitions(10)
.partitioner(new HashPartitioner(10))
.timeout(Durations.seconds(10)));
- JavaPairDStream<String, Boolean> stateSnapshots2 = stateDstream2.stateSnapshots();
+ stateDstream2.stateSnapshots();
}
@Test
@@ -126,33 +124,21 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
Collections.<Integer>emptySet()
);
+ @SuppressWarnings("unchecked")
List<Set<Tuple2<String, Integer>>> stateData = Arrays.asList(
Collections.<Tuple2<String, Integer>>emptySet(),
- Sets.newHashSet(new Tuple2<String, Integer>("a", 1)),
- Sets.newHashSet(new Tuple2<String, Integer>("a", 2), new Tuple2<String, Integer>("b", 1)),
- Sets.newHashSet(
- new Tuple2<String, Integer>("a", 3),
- new Tuple2<String, Integer>("b", 2),
- new Tuple2<String, Integer>("c", 1)),
- Sets.newHashSet(
- new Tuple2<String, Integer>("a", 4),
- new Tuple2<String, Integer>("b", 3),
- new Tuple2<String, Integer>("c", 1)),
- Sets.newHashSet(
- new Tuple2<String, Integer>("a", 5),
- new Tuple2<String, Integer>("b", 3),
- new Tuple2<String, Integer>("c", 1)),
- Sets.newHashSet(
- new Tuple2<String, Integer>("a", 5),
- new Tuple2<String, Integer>("b", 3),
- new Tuple2<String, Integer>("c", 1))
+ Sets.newHashSet(new Tuple2<>("a", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 2), new Tuple2<>("b", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 3), new Tuple2<>("b", 2), new Tuple2<>("c", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 4), new Tuple2<>("b", 3), new Tuple2<>("c", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new Tuple2<>("c", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new Tuple2<>("c", 1))
);
Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
-
@Override
- public Integer call(String key, Optional<Integer> value, State<Integer> state) throws Exception {
+ public Integer call(String key, Optional<Integer> value, State<Integer> state) {
int sum = value.or(0) + (state.exists() ? state.get() : 0);
state.update(sum);
return sum;
@@ -160,7 +146,7 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
};
testOperation(
inputData,
- StateSpec.<String, Integer, Integer, Integer>function(mappingFunc),
+ StateSpec.function(mappingFunc),
outputData,
stateData);
}
@@ -175,27 +161,25 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() {
@Override
- public Tuple2<K, Integer> call(K x) throws Exception {
- return new Tuple2<K, Integer>(x, 1);
+ public Tuple2<K, Integer> call(K x) {
+ return new Tuple2<>(x, 1);
}
})).mapWithState(mapWithStateSpec);
final List<Set<T>> collectedOutputs =
- Collections.synchronizedList(Lists.<Set<T>>newArrayList());
- mapWithStateDStream.foreachRDD(new Function<JavaRDD<T>, Void>() {
+ Collections.synchronizedList(new ArrayList<Set<T>>());
+ mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() {
@Override
- public Void call(JavaRDD<T> rdd) throws Exception {
+ public void call(JavaRDD<T> rdd) {
collectedOutputs.add(Sets.newHashSet(rdd.collect()));
- return null;
}
});
final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
- Collections.synchronizedList(Lists.<Set<Tuple2<K, S>>>newArrayList());
- mapWithStateDStream.stateSnapshots().foreachRDD(new Function<JavaPairRDD<K, S>, Void>() {
+ Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>());
+ mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() {
@Override
- public Void call(JavaPairRDD<K, S> rdd) throws Exception {
+ public void call(JavaPairRDD<K, S> rdd) {
collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));
- return null;
}
});
BatchCounter batchCounter = new BatchCounter(ssc.ssc());
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 7a8ef9d147..d09258e0e4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -18,13 +18,14 @@
package org.apache.spark.streaming;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import static org.junit.Assert.*;
import com.google.common.io.Closeables;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -68,12 +69,11 @@ public class JavaReceiverAPISuite implements Serializable {
return v1 + ".";
}
});
- mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
+ mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
- public Void call(JavaRDD<String> rdd) {
+ public void call(JavaRDD<String> rdd) {
long count = rdd.count();
dataCounter.addAndGet(count);
- return null;
}
});
@@ -90,7 +90,7 @@ public class JavaReceiverAPISuite implements Serializable {
Thread.sleep(100);
}
ssc.stop();
- assertTrue(dataCounter.get() > 0);
+ Assert.assertTrue(dataCounter.get() > 0);
} finally {
server.stop();
}
@@ -98,8 +98,8 @@ public class JavaReceiverAPISuite implements Serializable {
private static class JavaSocketReceiver extends Receiver<String> {
- String host = null;
- int port = -1;
+ private String host = null;
+ private int port = -1;
JavaSocketReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK());