From 0e2405490f2056728d1353abbac6f3ea177ae533 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 16 Feb 2017 12:32:45 +0000 Subject: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support - Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen Closes #16871 from srowen/SPARK-19493. --- external/java8-tests/README.md | 22 - external/java8-tests/pom.xml | 132 --- .../org/apache/spark/java8/Java8RDDAPISuite.java | 356 --------- .../apache/spark/java8/dstream/Java8APISuite.java | 882 --------------------- .../java8/sql/Java8DatasetAggregatorSuite.java | 62 -- .../src/test/resources/log4j.properties | 27 - .../org/apache/spark/java8/JDK8ScalaSuite.scala | 30 - .../apache/spark/sql/kafka010/KafkaSource.scala | 3 +- .../apache/spark/streaming/kafka010/KafkaRDD.scala | 7 +- 9 files changed, 2 insertions(+), 1519 deletions(-) delete mode 100644 external/java8-tests/README.md delete mode 100644 external/java8-tests/pom.xml delete mode 100644 external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java delete mode 100644 external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java delete mode 100644 external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java delete mode 100644 external/java8-tests/src/test/resources/log4j.properties delete mode 100644 external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala (limited to 'external') diff --git a/external/java8-tests/README.md b/external/java8-tests/README.md deleted file mode 100644 index aa87901695..0000000000 --- a/external/java8-tests/README.md +++ /dev/null @@ -1,22 +0,0 @@ -# Java 8 Test Suites - -These tests require having Java 8 installed and are isolated from the main Spark build. -If Java 8 is not your system's default Java version, you will need to point Spark's build -to your Java location. The set-up depends a bit on the build system: - -* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass - `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically - include the Java 8 test project. - - `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean java8-tests/test - -* For Maven users, - - Maven users can also refer to their Java 8 directory using JAVA_HOME. - - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests` - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn -pl :java8-tests_2.11 test` - - Note that the above command can only be run from project root directory since this module - depends on core and the test-jars of core and streaming. This means an install step is - required to make the test dependencies visible to the Java 8 sub-project. diff --git a/external/java8-tests/pom.xml b/external/java8-tests/pom.xml deleted file mode 100644 index 8fc46d7af2..0000000000 --- a/external/java8-tests/pom.xml +++ /dev/null @@ -1,132 +0,0 @@ - - - - 4.0.0 - - org.apache.spark - spark-parent_2.11 - 2.2.0-SNAPSHOT - ../../pom.xml - - - java8-tests_2.11 - pom - Spark Project Java 8 Tests - - - java8-tests - - - - - org.apache.spark - spark-core_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-core_${scala.binary.version} - ${project.version} - test-jar - test - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${project.version} - test-jar - test - - - org.apache.spark - spark-sql_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-sql_${scala.binary.version} - ${project.version} - test-jar - test - - - org.apache.spark - spark-tags_${scala.binary.version} - - - - - org.apache.spark - spark-tags_${scala.binary.version} - test-jar - test - - - - - - - - org.apache.maven.plugins - maven-deploy-plugin - - true - - - - org.apache.maven.plugins - maven-install-plugin - - true - - - - org.apache.maven.plugins - maven-compiler-plugin - - true - 1.8 - 1.8 - 1.8 - - - - net.alchim31.maven - scala-maven-plugin - - ${useZincForJdk8} - - -source - 1.8 - -target - 1.8 - -Xlint:all,-serial,-path - - - - - - diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java deleted file mode 100644 index fa3a66e73c..0000000000 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java +++ /dev/null @@ -1,356 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.java8; - -import java.io.File; -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.collect.Iterables; -import com.google.common.io.Files; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaDoubleRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.*; -import org.apache.spark.util.Utils; - -/** - * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8 - * lambda syntax. - */ -public class Java8RDDAPISuite implements Serializable { - private static int foreachCalls = 0; - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaAPISuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - } - - @Test - public void foreachWithAnonymousClass() { - foreachCalls = 0; - JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction() { - @Override - public void call(String s) { - foreachCalls++; - } - }); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void foreach() { - foreachCalls = 0; - JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(x -> foreachCalls++); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void groupBy() { - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function isOdd = x -> x % 2 == 0; - JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - - oddsAndEvens = rdd.groupBy(isOdd, 1); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - } - - @Test - public void leftOuterJoin() { - JavaPairRDD rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(1, 2), - new Tuple2<>(2, 1), - new Tuple2<>(3, 1) - )); - JavaPairRDD rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 'x'), - new Tuple2<>(2, 'y'), - new Tuple2<>(2, 'z'), - new Tuple2<>(4, 'w') - )); - List>>> joined = - rdd1.leftOuterJoin(rdd2).collect(); - Assert.assertEquals(5, joined.size()); - Tuple2>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); - Assert.assertEquals(3, firstUnmatched._1().intValue()); - } - - @Test - public void foldReduce() { - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function2 add = (a, b) -> a + b; - - int sum = rdd.fold(0, add); - Assert.assertEquals(33, sum); - - sum = rdd.reduce(add); - Assert.assertEquals(33, sum); - } - - @Test - public void foldByKey() { - List> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD rdd = sc.parallelizePairs(pairs); - JavaPairRDD sums = rdd.foldByKey(0, (a, b) -> a + b); - Assert.assertEquals(1, sums.lookup(1).get(0).intValue()); - Assert.assertEquals(2, sums.lookup(2).get(0).intValue()); - Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); - } - - @Test - public void reduceByKey() { - List> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD rdd = sc.parallelizePairs(pairs); - JavaPairRDD counts = rdd.reduceByKey((a, b) -> a + b); - Assert.assertEquals(1, counts.lookup(1).get(0).intValue()); - Assert.assertEquals(2, counts.lookup(2).get(0).intValue()); - Assert.assertEquals(3, counts.lookup(3).get(0).intValue()); - - Map localCounts = counts.collectAsMap(); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - - localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - } - - @Test - public void map() { - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); - doubles.collect(); - JavaPairRDD pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) - .cache(); - pairs.collect(); - JavaRDD strings = rdd.map(Object::toString).cache(); - strings.collect(); - } - - @Test - public void flatMap() { - JavaRDD rdd = sc.parallelize(Arrays.asList("Hello World!", - "The quick brown fox jumps over the lazy dog.")); - JavaRDD words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); - - Assert.assertEquals("Hello", words.first()); - Assert.assertEquals(11, words.count()); - - JavaPairRDD pairs = rdd.flatMapToPair(s -> { - List> pairs2 = new LinkedList<>(); - for (String word : s.split(" ")) { - pairs2.add(new Tuple2<>(word, word)); - } - return pairs2.iterator(); - }); - - Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); - Assert.assertEquals(11, pairs.count()); - - JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { - List lengths = new LinkedList<>(); - for (String word : s.split(" ")) { - lengths.add((double) word.length()); - } - return lengths.iterator(); - }); - - Assert.assertEquals(5.0, doubles.first(), 0.01); - Assert.assertEquals(11, pairs.count()); - } - - @Test - public void mapsFromPairsToPairs() { - List> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD pairRDD = sc.parallelizePairs(pairs); - - // Regression test for SPARK-668: - JavaPairRDD swapped = - pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator()); - swapped.collect(); - - // There was never a bug here, but it's worth testing: - pairRDD.map(Tuple2::swap).collect(); - } - - @Test - public void mapPartitions() { - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD partitionSums = rdd.mapPartitions(iter -> { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum).iterator(); - }); - - Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); - } - - @Test - public void sequenceFile() { - File tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - // Try reading the output back as an object file - JavaPairRDD readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) - .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); - Assert.assertEquals(pairs, readRDD.collect()); - Utils.deleteRecursively(tempDir); - } - - @Test - public void zip() { - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x); - JavaPairRDD zipped = rdd.zip(doubles); - zipped.count(); - } - - @Test - public void zipPartitions() { - JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); - JavaRDD rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); - FlatMapFunction2, Iterator, Integer> sizesFn = - (Iterator i, Iterator s) -> { - int sizeI = 0; - while (i.hasNext()) { - sizeI += 1; - i.next(); - } - int sizeS = 0; - while (s.hasNext()) { - sizeS += 1; - s.next(); - } - return Arrays.asList(sizeI, sizeS).iterator(); - }; - JavaRDD sizes = rdd1.zipPartitions(rdd2, sizesFn); - Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); - } - - @Test - public void keyBy() { - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2)); - List> s = rdd.keyBy(Object::toString).collect(); - Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); - Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); - } - - @Test - public void mapOnPairRDD() { - JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - JavaPairRDD rdd2 = - rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - JavaPairRDD rdd3 = - rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); - Assert.assertEquals(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(0, 2), - new Tuple2<>(1, 3), - new Tuple2<>(0, 4)), rdd3.collect()); - } - - @Test - public void collectPartitions() { - JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); - - JavaPairRDD rdd2 = - rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - List[] parts = rdd1.collectPartitions(new int[]{0}); - Assert.assertEquals(Arrays.asList(1, 2), parts[0]); - - parts = rdd1.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(3, 4), parts[0]); - Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); - - Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), - rdd2.collectPartitions(new int[]{0})[0]); - - List>[] parts2 = rdd2.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); - Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), - parts2[1]); - } - - @Test - public void collectAsMapWithIntArrayValues() { - // Regression test for SPARK-1040 - JavaRDD rdd = sc.parallelize(Arrays.asList(1)); - JavaPairRDD pairRDD = - rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); - pairRDD.collect(); // Works fine - pairRDD.collectAsMap(); // Used to crash with ClassCastException - } -} diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java deleted file mode 100644 index 338ca54ab8..0000000000 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java +++ /dev/null @@ -1,882 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.java8.dstream; - -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.junit.Assert; -import org.junit.Test; - -import org.apache.spark.HashPartitioner; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.*; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; - -/** - * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 - * lambda syntax. - */ -@SuppressWarnings("unchecked") -public class Java8APISuite extends LocalJavaStreamingContext implements Serializable { - - @Test - public void testMap() { - List> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("goodnight", "moon")); - - List> expected = Arrays.asList( - Arrays.asList(5, 5), - Arrays.asList(9, 4)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(String::length); - JavaTestUtils.attachTestOutputStream(letterCount); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testFilter() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List> expected = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("yankees")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream filtered = stream.filter(s -> s.contains("a")); - JavaTestUtils.attachTestOutputStream(filtered); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testMapPartitions() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List> expected = Arrays.asList( - Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOX")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions(in -> { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out).iterator(); - }); - JavaTestUtils.attachTestOutputStream(mapped); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduce() { - List> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(15), - Arrays.asList(24)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reduced = stream.reduce((x, y) -> x + y); - JavaTestUtils.attachTestOutputStream(reduced); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByWindow() { - List> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(21), - Arrays.asList(39), - Arrays.asList(24)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reducedWindowed = stream.reduceByWindow((x, y) -> x + y, - (x, y) -> x - y, new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reducedWindowed); - List> result = JavaTestUtils.runStreams(ssc, 4, 4); - - Assert.assertEquals(expected, result); - } - - @Test - public void testTransform() { - List> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List> expected = Arrays.asList( - Arrays.asList(3, 4, 5), - Arrays.asList(6, 7, 8), - Arrays.asList(9, 10, 11)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = stream.transform(in -> in.map(i -> i + 2)); - - JavaTestUtils.attachTestOutputStream(transformed); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testVariousTransform() { - // tests whether all variations of transform can be called from Java - - List> inputData = Arrays.asList(Arrays.asList(1)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - - List>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - - JavaDStream transformed1 = stream.transform(in -> null); - JavaDStream transformed2 = stream.transform((x, time) -> null); - JavaPairDStream transformed3 = stream.transformToPair(x -> null); - JavaPairDStream transformed4 = stream.transformToPair((x, time) -> null); - JavaDStream pairTransformed1 = pairStream.transform(x -> null); - JavaDStream pairTransformed2 = pairStream.transform((x, time) -> null); - JavaPairDStream pairTransformed3 = pairStream.transformToPair(x -> null); - JavaPairDStream pairTransformed4 = - pairStream.transformToPair((x, time) -> null); - - } - - @Test - public void testTransformWith() { - List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "dodgers"), - new Tuple2<>("new york", "yankees")), - Arrays.asList( - new Tuple2<>("california", "sharks"), - new Tuple2<>("new york", "rangers"))); - - List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "mets")), - Arrays.asList( - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "islanders"))); - - - List>>> expected = Arrays.asList( - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("dodgers", "giants")), - new Tuple2<>("new york", - new Tuple2<>("yankees", "mets"))), - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("sharks", "ducks")), - new Tuple2<>("new york", - new Tuple2<>("rangers", "islanders")))); - - JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream> joined = - pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y)); - - JavaTestUtils.attachTestOutputStream(joined); - List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List>>> unorderedResult = Lists.newArrayList(); - for (List>> res : result) { - unorderedResult.add(Sets.newHashSet(res)); - } - - Assert.assertEquals(expected, unorderedResult); - } - - - @Test - public void testVariousTransformWith() { - // tests whether all variations of transformWith can be called from Java - - List> inputData1 = Arrays.asList(Arrays.asList(1)); - List> inputData2 = Arrays.asList(Arrays.asList("x")); - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); - - List>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - List>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); - JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - - JavaDStream transformed1 = stream1.transformWith(stream2, (x, y, z) -> null); - JavaDStream transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null); - - JavaPairDStream transformed3 = - stream1.transformWithToPair(stream2,(x, y, z) -> null); - - JavaPairDStream transformed4 = - stream1.transformWithToPair(pairStream1,(x, y, z) -> null); - - JavaDStream pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null); - - JavaDStream pairTransformed2_ = - pairStream1.transformWith(pairStream1,(x, y, z) -> null); - - JavaPairDStream pairTransformed3 = - pairStream1.transformWithToPair(stream2,(x, y, z) -> null); - - JavaPairDStream pairTransformed4 = - pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null); - } - - @Test - public void testStreamingContextTransform() { - List> stream1input = Arrays.asList( - Arrays.asList(1), - Arrays.asList(2) - ); - - List> stream2input = Arrays.asList( - Arrays.asList(3), - Arrays.asList(4) - ); - - List>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<>(1, "x")), - Arrays.asList(new Tuple2<>(2, "y")) - ); - - List>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), - Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) - ); - - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); - JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - - List> listOfDStreams1 = Arrays.>asList(stream1, stream2); - - // This is just to test whether this transform to JavaStream compiles - JavaDStream transformed1 = ssc.transform( - listOfDStreams1, (List> listOfRDDs, Time time) -> { - Assert.assertEquals(2, listOfRDDs.size()); - return null; - }); - - List> listOfDStreams2 = - Arrays.>asList(stream1, stream2, pairStream1.toJavaDStream()); - - JavaPairDStream> transformed2 = ssc.transformToPair( - listOfDStreams2, (List> listOfRDDs, Time time) -> { - Assert.assertEquals(3, listOfRDDs.size()); - JavaRDD rdd1 = (JavaRDD) listOfRDDs.get(0); - JavaRDD rdd2 = (JavaRDD) listOfRDDs.get(1); - JavaRDD> rdd3 = (JavaRDD>) listOfRDDs.get(2); - JavaPairRDD prdd3 = JavaPairRDD.fromJavaRDD(rdd3); - PairFunction mapToTuple = - (Integer i) -> new Tuple2<>(i, i); - return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); - }); - JavaTestUtils.attachTestOutputStream(transformed2); - List>>> result = - JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMap() { - List> inputData = Arrays.asList( - Arrays.asList("go", "giants"), - Arrays.asList("boo", "dodgers"), - Arrays.asList("athletics")); - - List> expected = Arrays.asList( - Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"), - Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"), - Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s")); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream flatMapped = stream.flatMap( - s -> Lists.newArrayList(s.split("(?!^)")).iterator()); - JavaTestUtils.attachTestOutputStream(flatMapped); - List> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testPairFlatMap() { - List> inputData = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("dodgers"), - Arrays.asList("athletics")); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(6, "g"), - new Tuple2<>(6, "i"), - new Tuple2<>(6, "a"), - new Tuple2<>(6, "n"), - new Tuple2<>(6, "t"), - new Tuple2<>(6, "s")), - Arrays.asList( - new Tuple2<>(7, "d"), - new Tuple2<>(7, "o"), - new Tuple2<>(7, "d"), - new Tuple2<>(7, "g"), - new Tuple2<>(7, "e"), - new Tuple2<>(7, "r"), - new Tuple2<>(7, "s")), - Arrays.asList( - new Tuple2<>(9, "a"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "h"), - new Tuple2<>(9, "l"), - new Tuple2<>(9, "e"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "i"), - new Tuple2<>(9, "c"), - new Tuple2<>(9, "s"))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMapToPair(s -> { - List> out = Lists.newArrayList(); - for (String letter : s.split("(?!^)")) { - out.add(new Tuple2<>(s.length(), letter)); - } - return out.iterator(); - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - /* - * Performs an order-invariant comparison of lists representing two RDD streams. This allows - * us to account for ordering variation within individual RDD's which occurs during windowing. - */ - public static > void assertOrderInvariantEquals( - List> expected, List> actual) { - expected.forEach(list -> Collections.sort(list)); - List> sortedActual = new ArrayList<>(); - actual.forEach(list -> { - List sortedList = new ArrayList<>(list); - Collections.sort(sortedList); - sortedActual.add(sortedList); - }); - Assert.assertEquals(expected, sortedActual); - } - - @Test - public void testPairFilter() { - List> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("giants", 6)), - Arrays.asList(new Tuple2<>("yankees", 7))); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = - stream.mapToPair(x -> new Tuple2<>(x, x.length())); - JavaPairDStream filtered = pairStream.filter(x -> x._1().contains("a")); - JavaTestUtils.attachTestOutputStream(filtered); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - List>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers"), - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "yankees"), - new Tuple2<>("new york", "mets")), - Arrays.asList(new Tuple2<>("california", "sharks"), - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "rangers"), - new Tuple2<>("new york", "islanders"))); - - List>> stringIntKVStream = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 1), - new Tuple2<>("california", 3), - new Tuple2<>("new york", 4), - new Tuple2<>("new york", 1)), - Arrays.asList( - new Tuple2<>("california", 5), - new Tuple2<>("california", 5), - new Tuple2<>("new york", 3), - new Tuple2<>("new york", 1))); - - @Test - public void testPairMap() { // Maps pair -> pair of different type - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream reversed = pairStream.mapToPair(x -> x.swap()); - JavaTestUtils.attachTestOutputStream(reversed); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairMapPartitions() { // Maps pair -> pair of different type - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream reversed = pairStream.mapPartitionsToPair(in -> { - LinkedList> out = new LinkedList<>(); - while (in.hasNext()) { - Tuple2 next = in.next(); - out.add(next.swap()); - } - return out.iterator(); - }); - - JavaTestUtils.attachTestOutputStream(reversed); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairMap2() { // Maps pair -> single - List>> inputData = stringIntKVStream; - - List> expected = Arrays.asList( - Arrays.asList(1, 3, 4, 1), - Arrays.asList(5, 5, 3, 1)); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream reversed = pairStream.map(in -> in._2()); - JavaTestUtils.attachTestOutputStream(reversed); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair - List>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2)), - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2))); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o")), - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o"))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream flatMapped = pairStream.flatMapToPair(in -> { - List> out = new LinkedList<>(); - for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<>(in._2(), s.toString())); - } - return out.iterator(); - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairReduceByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduced = pairStream.reduceByKey((x, y) -> x + y); - - JavaTestUtils.attachTestOutputStream(reduced); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCombineByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream combined = pairStream.combineByKey(i -> i, - (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2)); - - JavaTestUtils.attachTestOutputStream(combined); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindow() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testUpdateStateByKey() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream updated = pairStream.updateStateByKey((values, state) -> { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v : values) { - out = out + v; - } - return Optional.of(out); - }); - - JavaTestUtils.attachTestOutputStream(updated); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindowWithInverse() { - List>> inputData = stringIntKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000), - new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairTransform() { - List>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5)), - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream sorted = pairStream.transformToPair(in -> in.sortByKey()); - - JavaTestUtils.attachTestOutputStream(sorted); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairToNormalRDDTransform() { - List>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List> expected = Arrays.asList( - Arrays.asList(3, 1, 4, 2), - Arrays.asList(2, 3, 4, 1)); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream firstParts = pairStream.transform(in -> in.map(x -> x._1())); - JavaTestUtils.attachTestOutputStream(firstParts); - List> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testMapValues() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "DODGERS"), - new Tuple2<>("california", "GIANTS"), - new Tuple2<>("new york", "YANKEES"), - new Tuple2<>("new york", "METS")), - Arrays.asList(new Tuple2<>("california", "SHARKS"), - new Tuple2<>("california", "DUCKS"), - new Tuple2<>("new york", "RANGERS"), - new Tuple2<>("new york", "ISLANDERS"))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream mapped = pairStream.mapValues(String::toUpperCase); - JavaTestUtils.attachTestOutputStream(mapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMapValues() { - List>> inputData = stringStringKVStream; - - List>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers1"), - new Tuple2<>("california", "dodgers2"), - new Tuple2<>("california", "giants1"), - new Tuple2<>("california", "giants2"), - new Tuple2<>("new york", "yankees1"), - new Tuple2<>("new york", "yankees2"), - new Tuple2<>("new york", "mets1"), - new Tuple2<>("new york", "mets2")), - Arrays.asList(new Tuple2<>("california", "sharks1"), - new Tuple2<>("california", "sharks2"), - new Tuple2<>("california", "ducks1"), - new Tuple2<>("california", "ducks2"), - new Tuple2<>("new york", "rangers1"), - new Tuple2<>("new york", "rangers2"), - new Tuple2<>("new york", "islanders1"), - new Tuple2<>("new york", "islanders2"))); - - JavaDStream> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream flatMapped = - pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2")); - JavaTestUtils.attachTestOutputStream(flatMapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - /** - * This test is only for testing the APIs. It's not necessary to run it. - */ - public void testMapWithStateAPI() { - JavaPairRDD initialRDD = null; - JavaPairDStream wordsDstream = null; - - JavaMapWithStateDStream stateDstream = - wordsDstream.mapWithState( - StateSpec.function((time, key, value, state) -> { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); - - JavaPairDStream emittedRecords = stateDstream.stateSnapshots(); - - JavaMapWithStateDStream stateDstream2 = - wordsDstream.mapWithState( - StateSpec.function((key, value, state) -> { - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); - - JavaPairDStream mappedDStream = stateDstream2.stateSnapshots(); - } -} diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java deleted file mode 100644 index 10d25fa445..0000000000 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.java8.sql; - -import java.util.Arrays; - -import org.junit.Assert; -import org.junit.Test; -import scala.Tuple2; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.KeyValueGroupedDataset; -import org.apache.spark.sql.expressions.javalang.typed; -import test.org.apache.spark.sql.JavaDatasetAggregatorSuiteBase; - -/** - * Suite that replicates tests in JavaDatasetAggregatorSuite using lambda syntax. - */ -public class Java8DatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { - @Test - public void testTypedAggregationAverage() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(typed.avg(v -> (double)(v._2() * 2))); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationCount() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(typed.count(v -> v)); - Assert.assertEquals(Arrays.asList(tuple2("a", 2L), tuple2("b", 1L)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationSumDouble() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(typed.sum(v -> (double)v._2())); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationSumLong() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(typed.sumLong(v -> (long)v._2())); - Assert.assertEquals(Arrays.asList(tuple2("a", 3L), tuple2("b", 3L)), agged.collectAsList()); - } -} diff --git a/external/java8-tests/src/test/resources/log4j.properties b/external/java8-tests/src/test/resources/log4j.properties deleted file mode 100644 index 3706a6e361..0000000000 --- a/external/java8-tests/src/test/resources/log4j.properties +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark_project.jetty=WARN diff --git a/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala b/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala deleted file mode 100644 index c4042e47e8..0000000000 --- a/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.java8 - -import org.apache.spark.SharedSparkContext -import org.apache.spark.SparkFunSuite - -/** - * Test cases where JDK8-compiled Scala user code is used with Spark. - */ -class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext { - test("basic RDD closure test (SPARK-6152)") { - sc.parallelize(1 to 1000).map(x => x * x).count() - } -} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 02b23111af..9c5dceca2d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -259,7 +259,7 @@ private[kafka010] class KafkaSource( val preferredLoc = if (numExecutors > 0) { // This allows cached KafkaConsumers in the executors to be re-used to read the same // partition in every batch. - Some(sortedExecutors(floorMod(tp.hashCode, numExecutors))) + Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors))) } else None KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc) }.filter { range => @@ -347,5 +347,4 @@ private[kafka010] object KafkaSource { if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host } } - def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index bf8adbe42f..4c6e2ce87e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -145,11 +145,6 @@ private[spark] class KafkaRDD[K, V]( a.host > b.host } - /** - * Non-negative modulus, from java 8 math - */ - private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b - override def getPreferredLocations(thePart: Partition): Seq[String] = { // The intention is best-effort consistent executor for a given topicpartition, // so that caching consumers can be effective. @@ -164,7 +159,7 @@ private[spark] class KafkaRDD[K, V]( Seq() } else { // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index - val index = this.floorMod(tp.hashCode, execs.length) + val index = Math.floorMod(tp.hashCode, execs.length) val chosen = execs(index) Seq(chosen.toString) } -- cgit v1.2.3