diff options
Diffstat (limited to 'streaming/src/test')
3 files changed, 33 insertions, 49 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 9722c60bba..ddc56fc869 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -772,8 +772,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @SuppressWarnings("unchecked") @Test public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sc().accumulator(0); + final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0); + final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0); List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1,1,1), Arrays.asList(1,1,1)); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index bc4bc2eb42..20e2a1c3d5 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -18,6 +18,7 @@ package org.apache.spark.streaming; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -26,10 +27,10 @@ import java.util.Set; import scala.Tuple2; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.util.ManualClock; import org.junit.Assert; @@ -51,10 +52,8 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaPairRDD<String, Boolean> initialRDD = null; JavaPairDStream<String, Integer> wordsDstream = null; - final Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> - mappingFunc = + Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc = new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() { - @Override public Optional<Double> call( Time time, String word, Optional<Integer> one, State<Boolean> state) { @@ -76,11 +75,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements .partitioner(new HashPartitioner(10)) .timeout(Durations.seconds(10))); - JavaPairDStream<String, Boolean> stateSnapshots = stateDstream.stateSnapshots(); + stateDstream.stateSnapshots(); - final Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 = + Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 = new Function3<String, Optional<Integer>, State<Boolean>, Double>() { - @Override public Double call(String key, Optional<Integer> one, State<Boolean> state) { // Use all State's methods here @@ -95,13 +93,13 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function(mappingFunc2) + StateSpec.function(mappingFunc2) .initialState(initialRDD) .numPartitions(10) .partitioner(new HashPartitioner(10)) .timeout(Durations.seconds(10))); - JavaPairDStream<String, Boolean> stateSnapshots2 = stateDstream2.stateSnapshots(); + stateDstream2.stateSnapshots(); } @Test @@ -126,33 +124,21 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements Collections.<Integer>emptySet() ); + @SuppressWarnings("unchecked") List<Set<Tuple2<String, Integer>>> stateData = Arrays.asList( Collections.<Tuple2<String, Integer>>emptySet(), - Sets.newHashSet(new Tuple2<String, Integer>("a", 1)), - Sets.newHashSet(new Tuple2<String, Integer>("a", 2), new Tuple2<String, Integer>("b", 1)), - Sets.newHashSet( - new Tuple2<String, Integer>("a", 3), - new Tuple2<String, Integer>("b", 2), - new Tuple2<String, Integer>("c", 1)), - Sets.newHashSet( - new Tuple2<String, Integer>("a", 4), - new Tuple2<String, Integer>("b", 3), - new Tuple2<String, Integer>("c", 1)), - Sets.newHashSet( - new Tuple2<String, Integer>("a", 5), - new Tuple2<String, Integer>("b", 3), - new Tuple2<String, Integer>("c", 1)), - Sets.newHashSet( - new Tuple2<String, Integer>("a", 5), - new Tuple2<String, Integer>("b", 3), - new Tuple2<String, Integer>("c", 1)) + Sets.newHashSet(new Tuple2<>("a", 1)), + Sets.newHashSet(new Tuple2<>("a", 2), new Tuple2<>("b", 1)), + Sets.newHashSet(new Tuple2<>("a", 3), new Tuple2<>("b", 2), new Tuple2<>("c", 1)), + Sets.newHashSet(new Tuple2<>("a", 4), new Tuple2<>("b", 3), new Tuple2<>("c", 1)), + Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new Tuple2<>("c", 1)), + Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new Tuple2<>("c", 1)) ); Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc = new Function3<String, Optional<Integer>, State<Integer>, Integer>() { - @Override - public Integer call(String key, Optional<Integer> value, State<Integer> state) throws Exception { + public Integer call(String key, Optional<Integer> value, State<Integer> state) { int sum = value.or(0) + (state.exists() ? state.get() : 0); state.update(sum); return sum; @@ -160,7 +146,7 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements }; testOperation( inputData, - StateSpec.<String, Integer, Integer, Integer>function(mappingFunc), + StateSpec.function(mappingFunc), outputData, stateData); } @@ -175,27 +161,25 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() { @Override - public Tuple2<K, Integer> call(K x) throws Exception { - return new Tuple2<K, Integer>(x, 1); + public Tuple2<K, Integer> call(K x) { + return new Tuple2<>(x, 1); } })).mapWithState(mapWithStateSpec); final List<Set<T>> collectedOutputs = - Collections.synchronizedList(Lists.<Set<T>>newArrayList()); - mapWithStateDStream.foreachRDD(new Function<JavaRDD<T>, Void>() { + Collections.synchronizedList(new ArrayList<Set<T>>()); + mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() { @Override - public Void call(JavaRDD<T> rdd) throws Exception { + public void call(JavaRDD<T> rdd) { collectedOutputs.add(Sets.newHashSet(rdd.collect())); - return null; } }); final List<Set<Tuple2<K, S>>> collectedStateSnapshots = - Collections.synchronizedList(Lists.<Set<Tuple2<K, S>>>newArrayList()); - mapWithStateDStream.stateSnapshots().foreachRDD(new Function<JavaPairRDD<K, S>, Void>() { + Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>()); + mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() { @Override - public Void call(JavaPairRDD<K, S> rdd) throws Exception { + public void call(JavaPairRDD<K, S> rdd) { collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())); - return null; } }); BatchCounter batchCounter = new BatchCounter(ssc.ssc()); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 7a8ef9d147..d09258e0e4 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -18,13 +18,14 @@ package org.apache.spark.streaming; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import static org.junit.Assert.*; import com.google.common.io.Closeables; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -68,12 +69,11 @@ public class JavaReceiverAPISuite implements Serializable { return v1 + "."; } }); - mapped.foreachRDD(new Function<JavaRDD<String>, Void>() { + mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() { @Override - public Void call(JavaRDD<String> rdd) { + public void call(JavaRDD<String> rdd) { long count = rdd.count(); dataCounter.addAndGet(count); - return null; } }); @@ -90,7 +90,7 @@ public class JavaReceiverAPISuite implements Serializable { Thread.sleep(100); } ssc.stop(); - assertTrue(dataCounter.get() > 0); + Assert.assertTrue(dataCounter.get() > 0); } finally { server.stop(); } @@ -98,8 +98,8 @@ public class JavaReceiverAPISuite implements Serializable { private static class JavaSocketReceiver extends Receiver<String> { - String host = null; - int port = -1; + private String host = null; + private int port = -1; JavaSocketReceiver(String host_ , int port_) { super(StorageLevel.MEMORY_AND_DISK()); |