From 0e2405490f2056728d1353abbac6f3ea177ae533 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 16 Feb 2017 12:32:45 +0000 Subject: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support - Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen Closes #16871 from srowen/SPARK-19493. --- .../org/apache/spark/streaming/JavaAPISuite.java | 2000 -------------------- 1 file changed, 2000 deletions(-) delete mode 100644 streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java (limited to 'streaming/src/test/java/org/apache') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java deleted file mode 100644 index 648a5abe0b..0000000000 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ /dev/null @@ -1,2000 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; - -import scala.Tuple2; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.io.Files; -import com.google.common.collect.Sets; - -import org.apache.spark.HashPartitioner; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.*; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.*; -import org.apache.spark.util.LongAccumulator; -import org.apache.spark.util.Utils; - -// The test suite itself is Serializable so that anonymous Function implementations can be -// serialized, as an alternative to converting these anonymous classes to static inner classes; -// see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { - - public static void equalIterator(Iterator a, Iterator b) { - while (a.hasNext() && b.hasNext()) { - Assert.assertEquals(a.next(), b.next()); - } - Assert.assertEquals(a.hasNext(), b.hasNext()); - } - - public static void equalIterable(Iterable a, Iterable b) { - equalIterator(a.iterator(), b.iterator()); - } - - @Test - public void testInitialization() { - Assert.assertNotNull(ssc.sparkContext()); - } - - @SuppressWarnings("unchecked") - @Test - public void testContextState() { - List> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4)); - Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaTestUtils.attachTestOutputStream(stream); - Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); - ssc.start(); - Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState()); - ssc.stop(); - Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState()); - } - - @SuppressWarnings("unchecked") - @Test - public void testCount() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3,4), - Arrays.asList(3,4,5), - Arrays.asList(3)); - - List> expected = Arrays.asList( - Arrays.asList(4L), - Arrays.asList(3L), - Arrays.asList(1L)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream count = stream.count(); - JavaTestUtils.attachTestOutputStream(count); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testMap() { - List> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("goodnight", "moon")); - - List> expected = Arrays.asList( - Arrays.asList(5,5), - Arrays.asList(9,4)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) { - return s.length(); - } - }); - JavaTestUtils.attachTestOutputStream(letterCount); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testWindow() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6,1,2,3), - Arrays.asList(7,8,9,4,5,6), - Arrays.asList(7,8,9)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List> result = JavaTestUtils.runStreams(ssc, 4, 4); - - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testWindowWithSlideDuration() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9), - Arrays.asList(10,11,12), - Arrays.asList(13,14,15), - Arrays.asList(16,17,18)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3,4,5,6), - Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12), - Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), - Arrays.asList(13,14,15,16,17,18)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List> result = JavaTestUtils.runStreams(ssc, 8, 4); - - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testFilter() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List> expected = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("yankees")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream filtered = stream.filter(new Function() { - @Override - public Boolean call(String s) { - return s.contains("a"); - } - }); - JavaTestUtils.attachTestOutputStream(filtered); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testRepartitionMorePartitions() { - List> inputData = Arrays.asList( - Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), - Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 2); - JavaDStreamLike,JavaRDD> repartitioned = - stream.repartition(4); - JavaTestUtils.attachTestOutputStream(repartitioned); - List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); - Assert.assertEquals(2, result.size()); - for (List> rdd : result) { - Assert.assertEquals(4, rdd.size()); - Assert.assertEquals( - 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testRepartitionFewerPartitions() { - List> inputData = Arrays.asList( - Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), - Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 4); - JavaDStreamLike,JavaRDD> repartitioned = - stream.repartition(2); - JavaTestUtils.attachTestOutputStream(repartitioned); - List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); - Assert.assertEquals(2, result.size()); - for (List> rdd : result) { - Assert.assertEquals(2, rdd.size()); - Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testGlom() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List>> expected = Arrays.asList( - Arrays.asList(Arrays.asList("giants", "dodgers")), - Arrays.asList(Arrays.asList("yankees", "red sox"))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream> glommed = stream.glom(); - JavaTestUtils.attachTestOutputStream(glommed); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testMapPartitions() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List> expected = Arrays.asList( - Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOX")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions( - new FlatMapFunction, String>() { - @Override - public Iterator call(Iterator in) { - StringBuilder out = new StringBuilder(); - while (in.hasNext()) { - out.append(in.next().toUpperCase(Locale.ENGLISH)); - } - return Arrays.asList(out.toString()).iterator(); - } - }); - JavaTestUtils.attachTestOutputStream(mapped); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - private static class IntegerSum implements Function2 { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - } - - private static class IntegerDifference implements Function2 { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 - i2; - } - } - - @SuppressWarnings("unchecked") - @Test - public void testReduce() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(15), - Arrays.asList(24)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reduced = stream.reduce(new IntegerSum()); - JavaTestUtils.attachTestOutputStream(reduced); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testReduceByWindowWithInverse() { - testReduceByWindow(true); - } - - @SuppressWarnings("unchecked") - @Test - public void testReduceByWindowWithoutInverse() { - testReduceByWindow(false); - } - - @SuppressWarnings("unchecked") - private void testReduceByWindow(boolean withInverse) { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(21), - Arrays.asList(39), - Arrays.asList(24)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reducedWindowed; - if (withInverse) { - reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new IntegerDifference(), - new Duration(2000), - new Duration(1000)); - } else { - reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new Duration(2000), new Duration(1000)); - } - JavaTestUtils.attachTestOutputStream(reducedWindowed); - List> result = JavaTestUtils.runStreams(ssc, 4, 4); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testQueueStream() { - ssc.stop(); - // Create a new JavaStreamingContext without checkpointing - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - - List> expected = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3)); - JavaRDD rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6)); - JavaRDD rdd3 = jsc.parallelize(Arrays.asList(7,8,9)); - - Queue> rdds = new LinkedList<>(); - rdds.add(rdd1); - rdds.add(rdd2); - rdds.add(rdd3); - - JavaDStream stream = ssc.queueStream(rdds); - JavaTestUtils.attachTestOutputStream(stream); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testTransform() { - List> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9)); - - List> expected = Arrays.asList( - Arrays.asList(3,4,5), - Arrays.asList(6,7,8), - Arrays.asList(9,10,11)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = stream.transform( - new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD in) { - return in.map(new Function() { - @Override - public Integer call(Integer i) { - return i + 2; - } - }); - } - }); - - JavaTestUtils.attachTestOutputStream(transformed); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testVariousTransform() { - // tests whether all variations of transform can be called from Java - - List> inputData = Arrays.asList(Arrays.asList(1)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - - List>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - - stream.transform( - new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD in) { - return null; - } - } - ); - - stream.transform( - new Function2, Time, JavaRDD>() { - @Override public JavaRDD call(JavaRDD in, Time time) { - return null; - } - } - ); - - stream.transformToPair( - new Function, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaRDD in) { - return null; - } - } - ); - - stream.transformToPair( - new Function2, Time, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaRDD in, Time time) { - return null; - } - } - ); - - pairStream.transform( - new Function, JavaRDD>() { - @Override public JavaRDD call(JavaPairRDD in) { - return null; - } - } - ); - - pairStream.transform( - new Function2, Time, JavaRDD>() { - @Override public JavaRDD call(JavaPairRDD in, Time time) { - return null; - } - } - ); - - pairStream.transformToPair( - new Function, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaPairRDD in) { - return null; - } - } - ); - - pairStream.transformToPair( - new Function2, Time, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaPairRDD in, - Time time) { - return null; - } - } - ); - - } - - @SuppressWarnings("unchecked") - @Test - public void testTransformWith() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "dodgers"), - new Tuple2<>("new york", "yankees")), - Arrays.asList( - new Tuple2<>("california", "sharks"), - new Tuple2<>("new york", "rangers"))); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "mets")), - Arrays.asList( - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "islanders"))); - - - List>>> expected = Arrays.asList( - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("dodgers", "giants")), - new Tuple2<>("new york", - new Tuple2<>("yankees", "mets"))), - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("sharks", "ducks")), - new Tuple2<>("new york", - new Tuple2<>("rangers", "islanders")))); - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream> joined = pairStream1.transformWithToPair( - pairStream2, - new Function3< - JavaPairRDD, - JavaPairRDD, - Time, - JavaPairRDD>>() { - @Override - public JavaPairRDD> call( - JavaPairRDD rdd1, - JavaPairRDD rdd2, - Time time) { - return rdd1.join(rdd2); - } - } - ); - - JavaTestUtils.attachTestOutputStream(joined); - List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List>>> unorderedResult = new ArrayList<>(); - for (List>> res: result) { - unorderedResult.add(Sets.newHashSet(res)); - } - - Assert.assertEquals(expected, unorderedResult); - } - - - @SuppressWarnings("unchecked") - @Test - public void testVariousTransformWith() { - // tests whether all variations of transformWith can be called from Java - - List> inputData1 = Arrays.asList(Arrays.asList(1)); - List> inputData2 = Arrays.asList(Arrays.asList("x")); - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); - - List>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - List>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - - stream1.transformWith( - stream2, - new Function3, JavaRDD, Time, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD rdd1, JavaRDD rdd2, Time time) { - return null; - } - } - ); - - stream1.transformWith( - pairStream1, - new Function3, JavaPairRDD, Time, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD rdd1, JavaPairRDD rdd2, - Time time) { - return null; - } - } - ); - - stream1.transformWithToPair( - stream2, - new Function3, JavaRDD, Time, JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaRDD rdd1, JavaRDD rdd2, - Time time) { - return null; - } - } - ); - - stream1.transformWithToPair( - pairStream1, - new Function3, JavaPairRDD, Time, - JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaRDD rdd1, - JavaPairRDD rdd2, - Time time) { - return null; - } - } - ); - - pairStream1.transformWith( - stream2, - new Function3, JavaRDD, Time, JavaRDD>() { - @Override - public JavaRDD call(JavaPairRDD rdd1, JavaRDD rdd2, - Time time) { - return null; - } - } - ); - - pairStream1.transformWith( - pairStream1, - new Function3, JavaPairRDD, Time, - JavaRDD>() { - @Override - public JavaRDD call(JavaPairRDD rdd1, - JavaPairRDD rdd2, - Time time) { - return null; - } - } - ); - - pairStream1.transformWithToPair( - stream2, - new Function3, JavaRDD, Time, - JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaPairRDD rdd1, - JavaRDD rdd2, - Time time) { - return null; - } - } - ); - - pairStream1.transformWithToPair( - pairStream2, - new Function3, JavaPairRDD, Time, - JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaPairRDD rdd1, - JavaPairRDD rdd2, - Time time) { - return null; - } - } - ); - } - - @SuppressWarnings("unchecked") - @Test - public void testStreamingContextTransform(){ - List> stream1input = Arrays.asList( - Arrays.asList(1), - Arrays.asList(2) - ); - - List> stream2input = Arrays.asList( - Arrays.asList(3), - Arrays.asList(4) - ); - - List>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<>(1, "x")), - Arrays.asList(new Tuple2<>(2, "y")) - ); - - List>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), - Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) - ); - - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - - List> listOfDStreams1 = Arrays.>asList(stream1, stream2); - - // This is just to test whether this transform to JavaStream compiles - ssc.transform( - listOfDStreams1, - new Function2>, Time, JavaRDD>() { - @Override - public JavaRDD call(List> listOfRDDs, Time time) { - Assert.assertEquals(2, listOfRDDs.size()); - return null; - } - } - ); - - List> listOfDStreams2 = - Arrays.>asList(stream1, stream2, pairStream1.toJavaDStream()); - - JavaPairDStream> transformed2 = ssc.transformToPair( - listOfDStreams2, - new Function2>, Time, JavaPairRDD>>() { - @Override - public JavaPairRDD> call(List> listOfRDDs, - Time time) { - Assert.assertEquals(3, listOfRDDs.size()); - JavaRDD rdd1 = (JavaRDD)listOfRDDs.get(0); - JavaRDD rdd2 = (JavaRDD)listOfRDDs.get(1); - JavaRDD> rdd3 = - (JavaRDD>)listOfRDDs.get(2); - JavaPairRDD prdd3 = JavaPairRDD.fromJavaRDD(rdd3); - PairFunction mapToTuple = - new PairFunction() { - @Override - public Tuple2 call(Integer i) { - return new Tuple2<>(i, i); - } - }; - return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); - } - } - ); - JavaTestUtils.attachTestOutputStream(transformed2); - List>>> result = - JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testFlatMap() { - List> inputData = Arrays.asList( - Arrays.asList("go", "giants"), - Arrays.asList("boo", "dodgers"), - Arrays.asList("athletics")); - - List> expected = Arrays.asList( - Arrays.asList("g","o","g","i","a","n","t","s"), - Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), - Arrays.asList("a","t","h","l","e","t","i","c","s")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream flatMapped = stream.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(x.split("(?!^)")).iterator(); - } - }); - JavaTestUtils.attachTestOutputStream(flatMapped); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testForeachRDD() { - final LongAccumulator accumRdd = ssc.sparkContext().sc().longAccumulator(); - final LongAccumulator accumEle = ssc.sparkContext().sc().longAccumulator(); - List> inputData = Arrays.asList( - Arrays.asList(1,1,1), - Arrays.asList(1,1,1)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - - stream.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - accumRdd.add(1); - rdd.foreach(new VoidFunction() { - @Override - public void call(Integer i) { - accumEle.add(1); - } - }); - } - }); - - // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD(new VoidFunction2, Time>() { - @Override - public void call(JavaRDD rdd, Time time) { - } - }); - - JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(2, accumRdd.value().intValue()); - Assert.assertEquals(6, accumEle.value().intValue()); - } - - @SuppressWarnings("unchecked") - @Test - public void testPairFlatMap() { - List> inputData = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("dodgers"), - Arrays.asList("athletics")); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(6, "g"), - new Tuple2<>(6, "i"), - new Tuple2<>(6, "a"), - new Tuple2<>(6, "n"), - new Tuple2<>(6, "t"), - new Tuple2<>(6, "s")), - Arrays.asList( - new Tuple2<>(7, "d"), - new Tuple2<>(7, "o"), - new Tuple2<>(7, "d"), - new Tuple2<>(7, "g"), - new Tuple2<>(7, "e"), - new Tuple2<>(7, "r"), - new Tuple2<>(7, "s")), - Arrays.asList( - new Tuple2<>(9, "a"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "h"), - new Tuple2<>(9, "l"), - new Tuple2<>(9, "e"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "i"), - new Tuple2<>(9, "c"), - new Tuple2<>(9, "s"))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMapToPair( - new PairFlatMapFunction() { - @Override - public Iterator> call(String in) { - List> out = new ArrayList<>(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<>(in.length(), letter)); - } - return out.iterator(); - } - }); - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testUnion() { - List> inputData1 = Arrays.asList( - Arrays.asList(1,1), - Arrays.asList(2,2), - Arrays.asList(3,3)); - - List> inputData2 = Arrays.asList( - Arrays.asList(4,4), - Arrays.asList(5,5), - Arrays.asList(6,6)); - - List> expected = Arrays.asList( - Arrays.asList(1,1,4,4), - Arrays.asList(2,2,5,5), - Arrays.asList(3,3,6,6)); - - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); - - JavaDStream unioned = stream1.union(stream2); - JavaTestUtils.attachTestOutputStream(unioned); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - /* - * Performs an order-invariant comparison of lists representing two RDD streams. This allows - * us to account for ordering variation within individual RDD's which occurs during windowing. - */ - public static void assertOrderInvariantEquals( - List> expected, List> actual) { - List> expectedSets = new ArrayList<>(); - for (List list: expected) { - expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list))); - } - List> actualSets = new ArrayList<>(); - for (List list: actual) { - actualSets.add(Collections.unmodifiableSet(new HashSet<>(list))); - } - Assert.assertEquals(expectedSets, actualSets); - } - - - // PairDStream Functions - @SuppressWarnings("unchecked") - @Test - public void testPairFilter() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("giants", 6)), - Arrays.asList(new Tuple2<>("yankees", 7))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = stream.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String in) { - return new Tuple2<>(in, in.length()); - } - }); - - JavaPairDStream filtered = pairStream.filter( - new Function, Boolean>() { - @Override - public Boolean call(Tuple2 in) { - return in._1().contains("a"); - } - }); - JavaTestUtils.attachTestOutputStream(filtered); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - private final List>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers"), - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "yankees"), - new Tuple2<>("new york", "mets")), - Arrays.asList(new Tuple2<>("california", "sharks"), - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "rangers"), - new Tuple2<>("new york", "islanders"))); - - @SuppressWarnings("unchecked") - private final List>> stringIntKVStream = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 1), - new Tuple2<>("california", 3), - new Tuple2<>("new york", 4), - new Tuple2<>("new york", 1)), - Arrays.asList( - new Tuple2<>("california", 5), - new Tuple2<>("california", 5), - new Tuple2<>("new york", 3), - new Tuple2<>("new york", 1))); - - @SuppressWarnings("unchecked") - @Test - public void testPairMap() { // Maps pair -> pair of different type - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream reversed = pairStream.mapToPair( - new PairFunction, Integer, String>() { - @Override - public Tuple2 call(Tuple2 in) { - return in.swap(); - } - }); - - JavaTestUtils.attachTestOutputStream(reversed); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testPairMapPartitions() { // Maps pair -> pair of different type - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream reversed = pairStream.mapPartitionsToPair( - new PairFlatMapFunction>, Integer, String>() { - @Override - public Iterator> call(Iterator> in) { - List> out = new LinkedList<>(); - while (in.hasNext()) { - Tuple2 next = in.next(); - out.add(next.swap()); - } - return out.iterator(); - } - }); - - JavaTestUtils.attachTestOutputStream(reversed); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testPairMap2() { // Maps pair -> single - List>> inputData = stringIntKVStream; - - List> expected = Arrays.asList( - Arrays.asList(1, 3, 4, 1), - Arrays.asList(5, 5, 3, 1)); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream reversed = pairStream.map( - new Function, Integer>() { - @Override - public Integer call(Tuple2 in) { - return in._2(); - } - }); - - JavaTestUtils.attachTestOutputStream(reversed); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair - List>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2)), - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2))); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o")), - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o"))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream flatMapped = pairStream.flatMapToPair( - new PairFlatMapFunction, Integer, String>() { - @Override - public Iterator> call(Tuple2 in) { - List> out = new LinkedList<>(); - for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<>(in._2(), s.toString())); - } - return out.iterator(); - } - }); - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testPairGroupByKey() { - List>> inputData = stringStringKVStream; - - List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", Arrays.asList("dodgers", "giants")), - new Tuple2<>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList( - new Tuple2<>("california", Arrays.asList("sharks", "ducks")), - new Tuple2<>("new york", Arrays.asList("rangers", "islanders")))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream> grouped = pairStream.groupByKey(); - JavaTestUtils.attachTestOutputStream(grouped); - List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected.size(), result.size()); - Iterator>>> resultItr = result.iterator(); - Iterator>>> expectedItr = expected.iterator(); - while (resultItr.hasNext() && expectedItr.hasNext()) { - Iterator>> resultElements = resultItr.next().iterator(); - Iterator>> expectedElements = expectedItr.next().iterator(); - while (resultElements.hasNext() && expectedElements.hasNext()) { - Tuple2> resultElement = resultElements.next(); - Tuple2> expectedElement = expectedElements.next(); - Assert.assertEquals(expectedElement._1(), resultElement._1()); - equalIterable(expectedElement._2(), resultElement._2()); - } - Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testPairReduceByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduced = pairStream.reduceByKey(new IntegerSum()); - - JavaTestUtils.attachTestOutputStream(reduced); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testCombineByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream combined = pairStream.combineByKey( - new Function() { - @Override - public Integer call(Integer i) { - return i; - } - }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); - - JavaTestUtils.attachTestOutputStream(combined); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testCountByValue() { - List> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("hello", "moon"), - Arrays.asList("hello")); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("hello", 1L), - new Tuple2<>("world", 1L)), - Arrays.asList( - new Tuple2<>("hello", 1L), - new Tuple2<>("moon", 1L)), - Arrays.asList( - new Tuple2<>("hello", 1L))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream counted = stream.countByValue(); - JavaTestUtils.attachTestOutputStream(counted); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testGroupByKeyAndWindow() { - List>> inputData = stringIntKVStream; - - List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", Arrays.asList(1, 3)), - new Tuple2<>("new york", Arrays.asList(1, 4)) - ), - Arrays.asList( - new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)), - new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4)) - ), - Arrays.asList( - new Tuple2<>("california", Arrays.asList(5, 5)), - new Tuple2<>("new york", Arrays.asList(1, 3)) - ) - ); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream> groupWindowed = - pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(groupWindowed); - List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected.size(), result.size()); - for (int i = 0; i < result.size(); i++) { - Assert.assertEquals(convert(expected.get(i)), convert(result.get(i))); - } - } - - private static Set>> - convert(List>> listOfTuples) { - List>> newListOfTuples = new ArrayList<>(); - for (Tuple2> tuple: listOfTuples) { - newListOfTuples.add(convert(tuple)); - } - return new HashSet<>(newListOfTuples); - } - - private static Tuple2> convert(Tuple2> tuple) { - return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2())); - } - - @SuppressWarnings("unchecked") - @Test - public void testReduceByKeyAndWindow() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testUpdateStateByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream updated = pairStream.updateStateByKey( - new Function2, Optional, Optional>() { - @Override - public Optional call(List values, Optional state) { - int out = 0; - if (state.isPresent()) { - out += state.get(); - } - for (Integer v : values) { - out += v; - } - return Optional.of(out); - } - }); - JavaTestUtils.attachTestOutputStream(updated); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testUpdateStateByKeyWithInitial() { - List>> inputData = stringIntKVStream; - - List> initial = Arrays.asList( - new Tuple2<>("california", 1), - new Tuple2<>("new york", 2)); - - JavaRDD> tmpRDD = ssc.sparkContext().parallelize(initial); - JavaPairRDD initialRDD = JavaPairRDD.fromJavaRDD(tmpRDD); - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 5), - new Tuple2<>("new york", 7)), - Arrays.asList(new Tuple2<>("california", 15), - new Tuple2<>("new york", 11)), - Arrays.asList(new Tuple2<>("california", 15), - new Tuple2<>("new york", 11))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream updated = pairStream.updateStateByKey( - new Function2, Optional, Optional>() { - @Override - public Optional call(List values, Optional state) { - int out = 0; - if (state.isPresent()) { - out += state.get(); - } - for (Integer v : values) { - out += v; - } - return Optional.of(out); - } - }, new HashPartitioner(1), initialRDD); - JavaTestUtils.attachTestOutputStream(updated); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testReduceByKeyAndWindowWithInverse() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), - new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testCountByValueAndWindow() { - List> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("hello", "moon"), - Arrays.asList("hello")); - - List>> expected = Arrays.asList( - Sets.newHashSet( - new Tuple2<>("hello", 1L), - new Tuple2<>("world", 1L)), - Sets.newHashSet( - new Tuple2<>("hello", 2L), - new Tuple2<>("world", 1L), - new Tuple2<>("moon", 1L)), - Sets.newHashSet( - new Tuple2<>("hello", 2L), - new Tuple2<>("moon", 1L))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream counted = - stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(counted); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - List>> unorderedResult = new ArrayList<>(); - for (List> res: result) { - unorderedResult.add(Sets.newHashSet(res)); - } - - Assert.assertEquals(expected, unorderedResult); - } - - @SuppressWarnings("unchecked") - @Test - public void testPairTransform() { - List>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5)), - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream sorted = pairStream.transformToPair( - new Function, JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaPairRDD in) { - return in.sortByKey(); - } - }); - - JavaTestUtils.attachTestOutputStream(sorted); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testPairToNormalRDDTransform() { - List>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List> expected = Arrays.asList( - Arrays.asList(3,1,4,2), - Arrays.asList(2,3,4,1)); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaDStream firstParts = pairStream.transform( - new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaPairRDD in) { - return in.map(new Function, Integer>() { - @Override - public Integer call(Tuple2 in2) { - return in2._1(); - } - }); - } - }); - - JavaTestUtils.attachTestOutputStream(firstParts); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testMapValues() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "DODGERS"), - new Tuple2<>("california", "GIANTS"), - new Tuple2<>("new york", "YANKEES"), - new Tuple2<>("new york", "METS")), - Arrays.asList(new Tuple2<>("california", "SHARKS"), - new Tuple2<>("california", "DUCKS"), - new Tuple2<>("new york", "RANGERS"), - new Tuple2<>("new york", "ISLANDERS"))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream mapped = pairStream.mapValues(new Function() { - @Override - public String call(String s) { - return s.toUpperCase(Locale.ENGLISH); - } - }); - - JavaTestUtils.attachTestOutputStream(mapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testFlatMapValues() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers1"), - new Tuple2<>("california", "dodgers2"), - new Tuple2<>("california", "giants1"), - new Tuple2<>("california", "giants2"), - new Tuple2<>("new york", "yankees1"), - new Tuple2<>("new york", "yankees2"), - new Tuple2<>("new york", "mets1"), - new Tuple2<>("new york", "mets2")), - Arrays.asList(new Tuple2<>("california", "sharks1"), - new Tuple2<>("california", "sharks2"), - new Tuple2<>("california", "ducks1"), - new Tuple2<>("california", "ducks2"), - new Tuple2<>("new york", "rangers1"), - new Tuple2<>("new york", "rangers2"), - new Tuple2<>("new york", "islanders1"), - new Tuple2<>("new york", "islanders2"))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - - JavaPairDStream flatMapped = pairStream.flatMapValues( - new Function>() { - @Override - public Iterable call(String in) { - List out = new ArrayList<>(); - out.add(in + "1"); - out.add(in + "2"); - return out; - } - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testCoGroup() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers"), - new Tuple2<>("new york", "yankees")), - Arrays.asList(new Tuple2<>("california", "sharks"), - new Tuple2<>("new york", "rangers"))); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "mets")), - Arrays.asList(new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "islanders"))); - - - List, List>>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", - new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))), - new Tuple2<>("new york", - new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))), - Arrays.asList( - new Tuple2<>("california", - new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))), - new Tuple2<>("new york", - new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); - - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream, Iterable>> grouped = - pairStream1.cogroup(pairStream2); - JavaTestUtils.attachTestOutputStream(grouped); - List, Iterable>>>> result = - JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected.size(), result.size()); - Iterator, Iterable>>>> resultItr = - result.iterator(); - Iterator, List>>>> expectedItr = - expected.iterator(); - while (resultItr.hasNext() && expectedItr.hasNext()) { - Iterator, Iterable>>> resultElements = - resultItr.next().iterator(); - Iterator, List>>> expectedElements = - expectedItr.next().iterator(); - while (resultElements.hasNext() && expectedElements.hasNext()) { - Tuple2, Iterable>> resultElement = - resultElements.next(); - Tuple2, List>> expectedElement = - expectedElements.next(); - Assert.assertEquals(expectedElement._1(), resultElement._1()); - equalIterable(expectedElement._2()._1(), resultElement._2()._1()); - equalIterable(expectedElement._2()._2(), resultElement._2()._2()); - } - Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testJoin() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers"), - new Tuple2<>("new york", "yankees")), - Arrays.asList(new Tuple2<>("california", "sharks"), - new Tuple2<>("new york", "rangers"))); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "mets")), - Arrays.asList(new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "islanders"))); - - - List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", - new Tuple2<>("dodgers", "giants")), - new Tuple2<>("new york", - new Tuple2<>("yankees", "mets"))), - Arrays.asList( - new Tuple2<>("california", - new Tuple2<>("sharks", "ducks")), - new Tuple2<>("new york", - new Tuple2<>("rangers", "islanders")))); - - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream> joined = pairStream1.join(pairStream2); - JavaTestUtils.attachTestOutputStream(joined); - List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testLeftOuterJoin() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers"), - new Tuple2<>("new york", "yankees")), - Arrays.asList(new Tuple2<>("california", "sharks") )); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "giants") ), - Arrays.asList(new Tuple2<>("new york", "islanders") ) - - ); - - List> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L)); - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream>> joined = - pairStream1.leftOuterJoin(pairStream2); - JavaDStream counted = joined.count(); - JavaTestUtils.attachTestOutputStream(counted); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testCheckpointMasterRecovery() throws InterruptedException { - List> inputData = Arrays.asList( - Arrays.asList("this", "is"), - Arrays.asList("a", "test"), - Arrays.asList("counting", "letters")); - - List> expectedInitial = Arrays.asList( - Arrays.asList(4,2)); - List> expectedFinal = Arrays.asList( - Arrays.asList(1,4), - Arrays.asList(8,7)); - - File tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - ssc.checkpoint(tempDir.getAbsolutePath()); - - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) { - return s.length(); - } - }); - JavaCheckpointTestUtils.attachTestOutputStream(letterCount); - List> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); - - assertOrderInvariantEquals(expectedInitial, initialResult); - Thread.sleep(1000); - ssc.stop(); - - ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); - // Tweak to take into consideration that the last batch before failure - // will be re-processed after recovery - List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3); - assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3)); - ssc.stop(); - Utils.deleteRecursively(tempDir); - } - - @SuppressWarnings("unchecked") - @Test - public void testContextGetOrCreate() throws InterruptedException { - ssc.stop(); - - final SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("newContext", "true"); - - File emptyDir = Files.createTempDir(); - emptyDir.deleteOnExit(); - StreamingContextSuite contextSuite = new StreamingContextSuite(); - String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint(); - String checkpointDir = contextSuite.createValidCheckpoint(); - - // Function to create JavaStreamingContext without any output operations - // (used to detect the new context) - final AtomicBoolean newContextCreated = new AtomicBoolean(false); - Function0 creatingFunc = new Function0() { - @Override - public JavaStreamingContext call() { - newContextCreated.set(true); - return new JavaStreamingContext(conf, Seconds.apply(1)); - } - }; - - newContextCreated.set(false); - ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc); - Assert.assertTrue("new context not created", newContextCreated.get()); - ssc.stop(); - - newContextCreated.set(false); - ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc, - new Configuration(), true); - Assert.assertTrue("new context not created", newContextCreated.get()); - ssc.stop(); - - newContextCreated.set(false); - ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new Configuration()); - Assert.assertTrue("old context not recovered", !newContextCreated.get()); - ssc.stop(); - - newContextCreated.set(false); - JavaSparkContext sc = new JavaSparkContext(conf); - ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new Configuration()); - Assert.assertTrue("old context not recovered", !newContextCreated.get()); - ssc.stop(); - } - - /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD - @SuppressWarnings("unchecked") - @Test - public void testCheckpointofIndividualStream() throws InterruptedException { - List> inputData = Arrays.asList( - Arrays.asList("this", "is"), - Arrays.asList("a", "test"), - Arrays.asList("counting", "letters")); - - List> expected = Arrays.asList( - Arrays.asList(4,2), - Arrays.asList(1,4), - Arrays.asList(8,7)); - - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function() { - @Override - public Integer call(String s) { - return s.length(); - } - }); - JavaCheckpointTestUtils.attachTestOutputStream(letterCount); - - letterCount.checkpoint(new Duration(1000)); - - List> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3); - assertOrderInvariantEquals(expected, result1); - } - */ - - // Input stream tests. These mostly just test that we can instantiate a given InputStream with - // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the - // InputStream functionality is deferred to the existing Scala tests. - @Test - public void testSocketTextStream() { - ssc.socketTextStream("localhost", 12345); - } - - @Test - public void testSocketString() { - ssc.socketStream( - "localhost", - 12345, - new Function>() { - @Override - public Iterable call(InputStream in) throws IOException { - List out = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(in, StandardCharsets.UTF_8))) { - for (String line; (line = reader.readLine()) != null;) { - out.add(line); - } - } - return out; - } - }, - StorageLevel.MEMORY_ONLY()); - } - - @SuppressWarnings("unchecked") - @Test - public void testTextFileStream() throws IOException { - File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark"); - List> expected = fileTestPrepare(testDir); - - JavaDStream input = ssc.textFileStream(testDir.toString()); - JavaTestUtils.attachTestOutputStream(input); - List> result = JavaTestUtils.runStreams(ssc, 1, 1); - - assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testFileStream() throws IOException { - File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark"); - List> expected = fileTestPrepare(testDir); - - JavaPairInputDStream inputStream = ssc.fileStream( - testDir.toString(), - LongWritable.class, - Text.class, - TextInputFormat.class, - new Function() { - @Override - public Boolean call(Path v1) { - return Boolean.TRUE; - } - }, - true); - - JavaDStream test = inputStream.map( - new Function, String>() { - @Override - public String call(Tuple2 v1) { - return v1._2().toString(); - } - }); - - JavaTestUtils.attachTestOutputStream(test); - List> result = JavaTestUtils.runStreams(ssc, 1, 1); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testRawSocketStream() { - ssc.rawSocketStream("localhost", 12345); - } - - private static List> fileTestPrepare(File testDir) throws IOException { - File existingFile = new File(testDir, "0"); - Files.write("0\n", existingFile, StandardCharsets.UTF_8); - Assert.assertTrue(existingFile.setLastModified(1000)); - Assert.assertEquals(1000, existingFile.lastModified()); - return Arrays.asList(Arrays.asList("0")); - } - - @SuppressWarnings("unchecked") - // SPARK-5795: no logic assertions, just testing that intended API invocations compile - private void compileSaveAsJavaAPI(JavaPairDStream pds) { - pds.saveAsNewAPIHadoopFiles( - "", "", LongWritable.class, Text.class, - org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); - pds.saveAsHadoopFiles( - "", "", LongWritable.class, Text.class, - org.apache.hadoop.mapred.SequenceFileOutputFormat.class); - // Checks that a previous common workaround for this API still compiles - pds.saveAsNewAPIHadoopFiles( - "", "", LongWritable.class, Text.class, - (Class) org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); - pds.saveAsHadoopFiles( - "", "", LongWritable.class, Text.class, - (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class); - } - -} -- cgit v1.2.3