aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-19 09:42:50 -0800
committerSean Owen <sowen@cloudera.com>2017-02-19 09:42:50 -0800
commit1487c9af20a333ead55955acf4c0aa323bea0d07 (patch)
tree5f47daa77e0f73da1e009cc3dcf0a5c0073246aa /streaming
parentde14d35f77071932963a994fac5aec0e5df838a1 (diff)
downloadspark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.gz
spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.bz2
spark-1487c9af20a333ead55955acf4c0aa323bea0d07.zip
[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request? Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code. ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #16964 from srowen/SPARK-19534.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java81
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java24
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java10
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java21
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java526
5 files changed, 162 insertions, 500 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index 9b7701003d..cb8ed83e5a 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -27,9 +27,6 @@ import java.util.Set;
import scala.Tuple2;
import com.google.common.collect.Sets;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.util.ManualClock;
import org.junit.Assert;
@@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaPairDStream<String, Integer> wordsDstream = null;
Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc =
- new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() {
- @Override
- public Optional<Double> call(
- Time time, String word, Optional<Integer> one, State<Boolean> state) {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }
+ (time, word, one, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
};
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
@@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
stateDstream.stateSnapshots();
Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
- new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
- @Override
- public Double call(String key, Optional<Integer> one, State<Boolean> state) {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }
+ (key, one, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
};
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
@@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
);
Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
- new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
- @Override
- public Integer call(String key, Optional<Integer> value, State<Integer> state) {
- int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
- state.update(sum);
- return sum;
- }
+ (key, value, state) -> {
+ int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
+ state.update(sum);
+ return sum;
};
testOperation(
inputData,
@@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
int numBatches = expectedOutputs.size();
JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2);
JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
- JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() {
- @Override
- public Tuple2<K, Integer> call(K x) {
- return new Tuple2<>(x, 1);
- }
- })).mapWithState(mapWithStateSpec);
-
- final List<Set<T>> collectedOutputs =
+ JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec);
+
+ List<Set<T>> collectedOutputs =
Collections.synchronizedList(new ArrayList<Set<T>>());
- mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() {
- @Override
- public void call(JavaRDD<T> rdd) {
- collectedOutputs.add(Sets.newHashSet(rdd.collect()));
- }
- });
- final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
+ mapWithStateDStream.foreachRDD(rdd -> collectedOutputs.add(Sets.newHashSet(rdd.collect())));
+ List<Set<Tuple2<K, S>>> collectedStateSnapshots =
Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>());
- mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() {
- @Override
- public void call(JavaPairRDD<K, S> rdd) {
- collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));
- }
- });
+ mapWithStateDStream.stateSnapshots().foreachRDD(rdd ->
+ collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())));
BatchCounter batchCounter = new BatchCounter(ssc.ssc());
ssc.start();
((ManualClock) ssc.ssc().scheduler().clock())
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 091ccbfd85..9156047244 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -58,24 +58,16 @@ public class JavaReceiverAPISuite implements Serializable {
TestServer server = new TestServer(0);
server.start();
- final AtomicLong dataCounter = new AtomicLong(0);
+ AtomicLong dataCounter = new AtomicLong(0);
try {
JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
JavaReceiverInputDStream<String> input =
ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
- JavaDStream<String> mapped = input.map(new Function<String, String>() {
- @Override
- public String call(String v1) {
- return v1 + ".";
- }
- });
- mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- long count = rdd.count();
- dataCounter.addAndGet(count);
- }
+ JavaDStream<String> mapped = input.map((Function<String, String>) v1 -> v1 + ".");
+ mapped.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
+ long count = rdd.count();
+ dataCounter.addAndGet(count);
});
ssc.start();
@@ -110,11 +102,7 @@ public class JavaReceiverAPISuite implements Serializable {
@Override
public void onStart() {
- new Thread() {
- @Override public void run() {
- receive();
- }
- }.start();
+ new Thread(this::receive).start();
}
@Override
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
index f02fa87f61..3f4e6ddb21 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import org.apache.spark.SparkConf;
import org.apache.spark.network.util.JavaUtils;
@@ -81,12 +80,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
@Override
public Iterator<ByteBuffer> readAll() {
- return Iterators.transform(records.iterator(), new Function<Record,ByteBuffer>() {
- @Override
- public ByteBuffer apply(Record input) {
- return input.buffer;
- }
- });
+ return Iterators.transform(records.iterator(), input -> input.buffer);
}
@Override
@@ -114,7 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
String data1 = "data1";
WriteAheadLogRecordHandle handle = wal.write(JavaUtils.stringToBytes(data1), 1234);
Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle);
- Assert.assertEquals(JavaUtils.bytesToString(wal.read(handle)), data1);
+ Assert.assertEquals(data1, JavaUtils.bytesToString(wal.read(handle)));
wal.write(JavaUtils.stringToBytes("data2"), 1235);
wal.write(JavaUtils.stringToBytes("data3"), 1236);
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 646cb97066..9948a4074c 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -28,7 +28,6 @@ import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import scala.Tuple2;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
@@ -101,7 +100,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
while (in.hasNext()) {
out = out + in.next().toUpperCase();
}
- return Lists.newArrayList(out).iterator();
+ return Arrays.asList(out).iterator();
});
JavaTestUtils.attachTestOutputStream(mapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -240,7 +239,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
unorderedResult.add(Sets.newHashSet(res));
}
@@ -315,7 +314,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
- List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+ List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2);
// This is just to test whether this transform to JavaStream compiles
JavaDStream<Long> transformed1 = ssc.transform(
@@ -325,7 +324,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
});
List<JavaDStream<?>> listOfDStreams2 =
- Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+ Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
@@ -358,7 +357,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> flatMapped = stream.flatMap(
- s -> Lists.newArrayList(s.split("(?!^)")).iterator());
+ s -> Arrays.asList(s.split("(?!^)")).iterator());
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -401,7 +400,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ List<Tuple2<Integer, String>> out = new ArrayList<>();
for (String letter : s.split("(?!^)")) {
out.add(new Tuple2<>(s.length(), letter));
}
@@ -420,7 +419,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
*/
public static <T extends Comparable<T>> void assertOrderInvariantEquals(
List<List<T>> expected, List<List<T>> actual) {
- expected.forEach(list -> Collections.sort(list));
+ expected.forEach(Collections::sort);
List<List<T>> sortedActual = new ArrayList<>();
actual.forEach(list -> {
List<T> sortedList = new ArrayList<>(list);
@@ -491,7 +490,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
+ JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap);
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -543,7 +542,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
+ JavaDStream<Integer> reversed = pairStream.map(Tuple2::_2);
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -629,7 +628,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
+ JavaPairDStream<String, Integer> combined = pairStream.combineByKey(i -> i,
(x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
JavaTestUtils.attachTestOutputStream(combined);
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index 8d24104d78..b966cbdca0 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -33,7 +33,6 @@ import org.apache.spark.streaming.Time;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -123,12 +122,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(9,4));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
- @Override
- public Integer call(String s) {
- return s.length();
- }
- });
+ JavaDStream<Integer> letterCount = stream.map(String::length);
JavaTestUtils.attachTestOutputStream(letterCount);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -194,12 +188,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("yankees"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
- @Override
- public Boolean call(String s) {
- return s.contains("a");
- }
- });
+ JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
JavaTestUtils.attachTestOutputStream(filtered);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -276,17 +265,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("YANKEESRED SOX"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> mapped = stream.mapPartitions(
- new FlatMapFunction<Iterator<String>, String>() {
- @Override
- public Iterator<String> call(Iterator<String> in) {
- StringBuilder out = new StringBuilder();
- while (in.hasNext()) {
- out.append(in.next().toUpperCase(Locale.ENGLISH));
- }
- return Arrays.asList(out.toString()).iterator();
- }
- });
+ JavaDStream<String> mapped = stream.mapPartitions(in -> {
+ StringBuilder out = new StringBuilder();
+ while (in.hasNext()) {
+ out.append(in.next().toUpperCase(Locale.ENGLISH));
+ }
+ return Arrays.asList(out.toString()).iterator();
+ });
JavaTestUtils.attachTestOutputStream(mapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -416,18 +401,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(9,10,11));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> transformed = stream.transform(
- new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) {
- return in.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer i) {
- return i + 2;
- }
- });
- }
- });
+ JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
JavaTestUtils.attachTestOutputStream(transformed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -448,71 +422,21 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
- stream.transform(
- new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) {
- return null;
- }
- }
- );
+ stream.transform(in -> null);
- stream.transform(
- new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) {
- return null;
- }
- }
- );
+ stream.transform((in, time) -> null);
- stream.transformToPair(
- new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) {
- return null;
- }
- }
- );
+ stream.transformToPair(in -> null);
- stream.transformToPair(
- new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) {
- return null;
- }
- }
- );
+ stream.transformToPair((in, time) -> null);
- pairStream.transform(
- new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) {
- return null;
- }
- }
- );
+ pairStream.transform(in -> null);
- pairStream.transform(
- new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) {
- return null;
- }
- }
- );
+ pairStream.transform((in, time) -> null);
- pairStream.transformToPair(
- new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
- @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) {
- return null;
- }
- }
- );
+ pairStream.transformToPair(in -> null);
- pairStream.transformToPair(
- new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
- @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in,
- Time time) {
- return null;
- }
- }
- );
+ pairStream.transformToPair((in, time) -> null);
}
@@ -558,19 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair(
pairStream2,
- new Function3<
- JavaPairRDD<String, String>,
- JavaPairRDD<String, String>,
- Time,
- JavaPairRDD<String, Tuple2<String, String>>>() {
- @Override
- public JavaPairRDD<String, Tuple2<String, String>> call(
- JavaPairRDD<String, String> rdd1,
- JavaPairRDD<String, String> rdd2,
- Time time) {
- return rdd1.join(rdd2);
- }
- }
+ (rdd1, rdd2, time) -> rdd1.join(rdd2)
);
JavaTestUtils.attachTestOutputStream(joined);
@@ -603,100 +515,21 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
- stream1.transformWith(
- stream2,
- new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
- @Override
- public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) {
- return null;
- }
- }
- );
+ stream1.transformWith(stream2, (rdd1, rdd2, time) -> null);
- stream1.transformWith(
- pairStream1,
- new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
- @Override
- public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2,
- Time time) {
- return null;
- }
- }
- );
+ stream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null);
- stream1.transformWithToPair(
- stream2,
- new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
- @Override
- public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2,
- Time time) {
- return null;
- }
- }
- );
+ stream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null);
- stream1.transformWithToPair(
- pairStream1,
- new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time,
- JavaPairRDD<Double, Double>>() {
- @Override
- public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1,
- JavaPairRDD<String, Integer> rdd2,
- Time time) {
- return null;
- }
- }
- );
+ stream1.transformWithToPair(pairStream1, (rdd1, rdd2, time) -> null);
- pairStream1.transformWith(
- stream2,
- new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
- @Override
- public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2,
- Time time) {
- return null;
- }
- }
- );
+ pairStream1.transformWith(stream2, (rdd1, rdd2, time) -> null);
- pairStream1.transformWith(
- pairStream1,
- new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time,
- JavaRDD<Double>>() {
- @Override
- public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1,
- JavaPairRDD<String, Integer> rdd2,
- Time time) {
- return null;
- }
- }
- );
+ pairStream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null);
- pairStream1.transformWithToPair(
- stream2,
- new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time,
- JavaPairRDD<Double, Double>>() {
- @Override
- public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1,
- JavaRDD<String> rdd2,
- Time time) {
- return null;
- }
- }
- );
+ pairStream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null);
- pairStream1.transformWithToPair(
- pairStream2,
- new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time,
- JavaPairRDD<Double, Double>>() {
- @Override
- public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1,
- JavaPairRDD<Double, Character> rdd2,
- Time time) {
- return null;
- }
- }
- );
+ pairStream1.transformWithToPair(pairStream2, (rdd1, rdd2, time) -> null);
}
@SuppressWarnings("unchecked")
@@ -727,44 +560,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
- List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+ List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2);
// This is just to test whether this transform to JavaStream compiles
ssc.transform(
listOfDStreams1,
- new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
- @Override
- public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
- Assert.assertEquals(2, listOfRDDs.size());
- return null;
- }
+ (listOfRDDs, time) -> {
+ Assert.assertEquals(2, listOfRDDs.size());
+ return null;
}
);
List<JavaDStream<?>> listOfDStreams2 =
- Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+ Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
listOfDStreams2,
- new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
- @Override
- public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs,
- Time time) {
- Assert.assertEquals(3, listOfRDDs.size());
- JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
- JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
- JavaRDD<Tuple2<Integer, String>> rdd3 =
- (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
- JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
- PairFunction<Integer, Integer, Integer> mapToTuple =
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i, i);
- }
- };
- return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
- }
+ (listOfRDDs, time) -> {
+ Assert.assertEquals(3, listOfRDDs.size());
+ JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
+ JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
+ JavaRDD<Tuple2<Integer, String>> rdd3 =
+ (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
+ JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+ PairFunction<Integer, Integer, Integer> mapToTuple =
+ (PairFunction<Integer, Integer, Integer>) i -> new Tuple2<>(i, i);
+ return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
}
);
JavaTestUtils.attachTestOutputStream(transformed2);
@@ -787,12 +608,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("a","t","h","l","e","t","i","c","s"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split("(?!^)")).iterator();
- }
- });
+ JavaDStream<String> flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -811,25 +627,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
- stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
- @Override
- public void call(JavaRDD<Integer> rdd) {
- accumRdd.add(1);
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer i) {
- accumEle.add(1);
- }
- });
- }
+ stream.foreachRDD(rdd -> {
+ accumRdd.add(1);
+ rdd.foreach(i -> accumEle.add(1));
});
// This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
- stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() {
- @Override
- public void call(JavaRDD<Integer> rdd, Time time) {
- }
- });
+ stream.foreachRDD((rdd, time) -> {});
JavaTestUtils.runStreams(ssc, 2, 2);
@@ -873,16 +677,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
- new PairFlatMapFunction<String, Integer, String>() {
- @Override
- public Iterator<Tuple2<Integer, String>> call(String in) {
- List<Tuple2<Integer, String>> out = new ArrayList<>();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<>(in.length(), letter));
- }
- return out.iterator();
+ JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(in -> {
+ List<Tuple2<Integer, String>> out = new ArrayList<>();
+ for (String letter : in.split("(?!^)")) {
+ out.add(new Tuple2<>(in.length(), letter));
}
+ return out.iterator();
});
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -949,21 +749,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(new Tuple2<>("yankees", 7)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String in) {
- return new Tuple2<>(in, in.length());
- }
- });
+ JavaPairDStream<String, Integer> pairStream =
+ stream.mapToPair(in -> new Tuple2<>(in, in.length()));
- JavaPairDStream<String, Integer> filtered = pairStream.filter(
- new Function<Tuple2<String, Integer>, Boolean>() {
- @Override
- public Boolean call(Tuple2<String, Integer> in) {
- return in._1().contains("a");
- }
- });
+ JavaPairDStream<String, Integer> filtered = pairStream.filter(in -> in._1().contains("a"));
JavaTestUtils.attachTestOutputStream(filtered);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1014,13 +803,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
- new PairFunction<Tuple2<String, Integer>, Integer, String>() {
- @Override
- public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
- return in.swap();
- }
- });
+ JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap);
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1048,18 +831,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
- new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
- @Override
- public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
- List<Tuple2<Integer, String>> out = new LinkedList<>();
- while (in.hasNext()) {
- Tuple2<String, Integer> next = in.next();
- out.add(next.swap());
- }
- return out.iterator();
- }
- });
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
+ List<Tuple2<Integer, String>> out = new LinkedList<>();
+ while (in.hasNext()) {
+ Tuple2<String, Integer> next = in.next();
+ out.add(next.swap());
+ }
+ return out.iterator();
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1079,13 +858,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaDStream<Integer> reversed = pairStream.map(
- new Function<Tuple2<String, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<String, Integer> in) {
- return in._2();
- }
- });
+ JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1119,17 +892,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
- new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
- @Override
- public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
- List<Tuple2<Integer, String>> out = new LinkedList<>();
- for (Character s : in._1().toCharArray()) {
- out.add(new Tuple2<>(in._2(), s.toString()));
- }
- return out.iterator();
- }
- });
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
+ List<Tuple2<Integer, String>> out = new LinkedList<>();
+ for (Character s : in._1().toCharArray()) {
+ out.add(new Tuple2<>(in._2(), s.toString()));
+ }
+ return out.iterator();
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1216,12 +985,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
- new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer i) {
- return i;
- }
- }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
+ i -> i, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
JavaTestUtils.attachTestOutputStream(combined);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1345,20 +1109,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out += state.get();
- }
- for (Integer v : values) {
- out += v;
- }
- return Optional.of(out);
- }
- });
+ JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
+ int out = 0;
+ if (state.isPresent()) {
+ out += state.get();
+ }
+ for (Integer v : values) {
+ out += v;
+ }
+ return Optional.of(out);
+ });
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1389,20 +1149,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out += state.get();
- }
- for (Integer v : values) {
- out += v;
- }
- return Optional.of(out);
- }
- }, new HashPartitioner(1), initialRDD);
+ JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
+ int out = 0;
+ if (state.isPresent()) {
+ out += state.get();
+ }
+ for (Integer v : values) {
+ out += v;
+ }
+ return Optional.of(out);
+ }, new HashPartitioner(1), initialRDD);
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1500,13 +1256,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
- new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
- @Override
- public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) {
- return in.sortByKey();
- }
- });
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
JavaTestUtils.attachTestOutputStream(sorted);
List<List<Tuple2<Integer, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1537,18 +1287,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaDStream<Integer> firstParts = pairStream.transform(
- new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) {
- return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<Integer, Integer> in2) {
- return in2._1();
- }
- });
- }
- });
+ JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(in2 -> in2._1()));
JavaTestUtils.attachTestOutputStream(firstParts);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1575,12 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
- @Override
- public String call(String s) {
- return s.toUpperCase(Locale.ENGLISH);
- }
- });
+ JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH));
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1616,16 +1350,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(
- new Function<String, Iterable<String>>() {
- @Override
- public Iterable<String> call(String in) {
- List<String> out = new ArrayList<>();
- out.add(in + "1");
- out.add(in + "2");
- return out;
- }
- });
+ JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
+ List<String> out = new ArrayList<>();
+ out.add(in + "1");
+ out.add(in + "2");
+ return out;
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1795,12 +1525,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
- @Override
- public Integer call(String s) {
- return s.length();
- }
- });
+ JavaDStream<Integer> letterCount = stream.map(String::length);
JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
@@ -1822,7 +1547,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testContextGetOrCreate() throws InterruptedException {
ssc.stop();
- final SparkConf conf = new SparkConf()
+ SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
.set("newContext", "true");
@@ -1835,13 +1560,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// Function to create JavaStreamingContext without any output operations
// (used to detect the new context)
- final AtomicBoolean newContextCreated = new AtomicBoolean(false);
- Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
- @Override
- public JavaStreamingContext call() {
- newContextCreated.set(true);
- return new JavaStreamingContext(conf, Seconds.apply(1));
- }
+ AtomicBoolean newContextCreated = new AtomicBoolean(false);
+ Function0<JavaStreamingContext> creatingFunc = () -> {
+ newContextCreated.set(true);
+ return new JavaStreamingContext(conf, Seconds.apply(1));
};
newContextCreated.set(false);
@@ -1912,18 +1634,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc.socketStream(
"localhost",
12345,
- new Function<InputStream, Iterable<String>>() {
- @Override
- public Iterable<String> call(InputStream in) throws IOException {
- List<String> out = new ArrayList<>();
- try (BufferedReader reader = new BufferedReader(
- new InputStreamReader(in, StandardCharsets.UTF_8))) {
- for (String line; (line = reader.readLine()) != null;) {
- out.add(line);
- }
+ in -> {
+ List<String> out = new ArrayList<>();
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(in, StandardCharsets.UTF_8))) {
+ for (String line; (line = reader.readLine()) != null;) {
+ out.add(line);
}
- return out;
}
+ return out;
},
StorageLevel.MEMORY_ONLY());
}
@@ -1952,21 +1671,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
LongWritable.class,
Text.class,
TextInputFormat.class,
- new Function<Path, Boolean>() {
- @Override
- public Boolean call(Path v1) {
- return Boolean.TRUE;
- }
- },
+ v1 -> Boolean.TRUE,
true);
- JavaDStream<String> test = inputStream.map(
- new Function<Tuple2<LongWritable, Text>, String>() {
- @Override
- public String call(Tuple2<LongWritable, Text> v1) {
- return v1._2().toString();
- }
- });
+ JavaDStream<String> test = inputStream.map(v1 -> v1._2().toString());
JavaTestUtils.attachTestOutputStream(test);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);