aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-03-03 22:31:30 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-03-03 22:31:30 -0800
commit181ec5030792a10f3ce77e997d0e2eda9bcd6139 (patch)
tree9b88504e5a3eca8177e4ebe1257ea9ce56120c13 /extras
parentb14ede789abfabe25144385e8dc2fb96691aba81 (diff)
downloadspark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.gz
spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.bz2
spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.zip
[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs
Author: Prashant Sharma <prashant.s@imaginea.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits: 95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch. 85a954e [Prashant Sharma] Nit. import orderings. 673f7ac [Prashant Sharma] Added support for -java-home as well 80a13e8 [Prashant Sharma] Used fake class tag syntax 26eb3f6 [Prashant Sharma] Patrick's comments on PR. 35d8d79 [Prashant Sharma] Specified java 8 building in the docs 31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag. 4ab87d3 [Prashant Sharma] Review feedback on the pr c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
Diffstat (limited to 'extras')
-rw-r--r--extras/README.md1
-rw-r--r--extras/java8-tests/README.md24
-rw-r--r--extras/java8-tests/pom.xml151
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java391
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java841
-rw-r--r--extras/java8-tests/src/test/resources/log4j.properties28
6 files changed, 1436 insertions, 0 deletions
diff --git a/extras/README.md b/extras/README.md
new file mode 100644
index 0000000000..1b4174b7d5
--- /dev/null
+++ b/extras/README.md
@@ -0,0 +1 @@
+This directory contains build components not included by default in Spark's build.
diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md
new file mode 100644
index 0000000000..e95b73ac77
--- /dev/null
+++ b/extras/java8-tests/README.md
@@ -0,0 +1,24 @@
+# Java 8 Test Suites
+
+These tests require having Java 8 installed and are isolated from the main Spark build.
+If Java 8 is not your system's default Java version, you will need to point Spark's build
+to your Java location. The set-up depends a bit on the build system:
+
+* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
+ `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
+ include the Java 8 test project.
+
+ `$ JAVA_HOME=/opt/jdk1.8.0/ sbt/sbt clean "test-only org.apache.spark.Java8APISuite"`
+
+* For Maven users,
+
+ Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not
+ automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests`
+ must be used.
+
+ `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
+ `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite`
+
+ Note that the above command can only be run from project root directory since this module
+ depends on core and the test-jars of core and streaming. This means an install step is
+ required to make the test dependencies visible to the Java 8 sub-project.
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
new file mode 100644
index 0000000000..602f66f9c5
--- /dev/null
+++ b/extras/java8-tests/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>java8-tests_2.10</artifactId>
+ <packaging>pom</packaging>
+ <name>Spark Project Java8 Tests POM</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>java8-tests</id>
+ </profile>
+ </profiles>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <systemPropertyVariables>
+ <!-- For some reason surefire isn't setting this log4j file on the
+ test classpath automatically. So we add it manually. -->
+ <log4j.configuration>
+ file:src/test/resources/log4j.properties
+ </log4j.configuration>
+ </systemPropertyVariables>
+ <skipTests>false</skipTests>
+ <includes>
+ <include>**/Suite*.java</include>
+ <include>**/*Suite.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <fork>true</fork>
+ <verbose>true</verbose>
+ <forceJavacCompilerUse>true</forceJavacCompilerUse>
+ <source>1.8</source>
+ <compilerVersion>1.8</compilerVersion>
+ <target>1.8</target>
+ <encoding>UTF-8</encoding>
+ <maxmem>1024m</maxmem>
+ </configuration>
+ </plugin>
+ <plugin>
+ <!-- disabled -->
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>attach-scaladocs</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
new file mode 100644
index 0000000000..f67251217e
--- /dev/null
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
+
+/**
+ * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite implements Serializable {
+ static int foreachCalls = 0;
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaAPISuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port");
+ }
+
+ @Test
+ public void foreachWithAnonymousClass() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String s) {
+ foreachCalls++;
+ }
+ });
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void foreach() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach((x) -> foreachCalls++);
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void groupBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
+ JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
+ Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+
+ oddsAndEvens = rdd.groupBy(isOdd, 1);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
+ Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ }
+
+ @Test
+ public void leftOuterJoin() {
+ JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(1, 2),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(3, 1)
+ ));
+ JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Character>(1, 'x'),
+ new Tuple2<Integer, Character>(2, 'y'),
+ new Tuple2<Integer, Character>(2, 'z'),
+ new Tuple2<Integer, Character>(4, 'w')
+ ));
+ List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
+ rdd1.leftOuterJoin(rdd2).collect();
+ Assert.assertEquals(5, joined.size());
+ Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
+ rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
+ Assert.assertEquals(3, firstUnmatched._1().intValue());
+ }
+
+ @Test
+ public void foldReduce() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
+
+ int sum = rdd.fold(0, add);
+ Assert.assertEquals(33, sum);
+
+ sum = rdd.reduce(add);
+ Assert.assertEquals(33, sum);
+ }
+
+ @Test
+ public void foldByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(3, 2),
+ new Tuple2<Integer, Integer>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
+ Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
+ }
+
+ @Test
+ public void reduceByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(3, 2),
+ new Tuple2<Integer, Integer>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
+ Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
+
+ Map<Integer, Integer> localCounts = counts.collectAsMap();
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+
+ localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+ }
+
+ @Test
+ public void map() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
+ doubles.collect();
+ JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
+ .cache();
+ pairs.collect();
+ JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
+ strings.collect();
+ }
+
+ @Test
+ public void flatMap() {
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+ "The quick brown fox jumps over the lazy dog."));
+ JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
+
+ Assert.assertEquals("Hello", words.first());
+ Assert.assertEquals(11, words.count());
+
+ JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
+ List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
+ for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
+ return pairs2;
+ });
+
+ Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+ Assert.assertEquals(11, pairs.count());
+
+ JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
+ List<Double> lengths = new LinkedList<Double>();
+ for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+ return lengths;
+ });
+
+ Double x = doubles.first();
+ Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+ Assert.assertEquals(11, pairs.count());
+ }
+
+ @Test
+ public void mapsFromPairsToPairs() {
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+ // Regression test for SPARK-668:
+ JavaPairRDD<String, Integer> swapped =
+ pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
+ swapped.collect();
+
+ // There was never a bug here, but it's worth testing:
+ pairRDD.map(item -> item.swap()).collect();
+ }
+
+ @Test
+ public void mapPartitions() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+ JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next();
+ }
+ return Collections.singletonList(sum);
+ });
+
+ Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+ }
+
+ @Test
+ public void sequenceFile() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.mapToPair(pair ->
+ new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+ // Try reading the output back as an object file
+ JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
+ .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
+ Assert.assertEquals(pairs, readRDD.collect());
+ }
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
+
+ @Test
+ public void zipPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+ JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+ FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+ (Iterator<Integer> i, Iterator<String> s) -> {
+ int sizeI = 0;
+ int sizeS = 0;
+ while (i.hasNext()) {
+ sizeI += 1;
+ i.next();
+ }
+ while (s.hasNext()) {
+ sizeS += 1;
+ s.next();
+ }
+ return Arrays.asList(sizeI, sizeS);
+ };
+ JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+ Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+ }
+
+ @Test
+ public void accumulators() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+ final Accumulator<Integer> intAccum = sc.intAccumulator(10);
+ rdd.foreach(x -> intAccum.add(x));
+ Assert.assertEquals((Integer) 25, intAccum.value());
+
+ final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+ rdd.foreach(x -> doubleAccum.add((double) x));
+ Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+ // Try a custom accumulator type
+ AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+ public Float addInPlace(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float addAccumulator(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float zero(Float initialValue) {
+ return 0.0f;
+ }
+ };
+
+ final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+ rdd.foreach(x -> floatAccum.add((float) x));
+ Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+ // Test the setValue method
+ floatAccum.setValue(5.0f);
+ Assert.assertEquals((Float) 5.0f, floatAccum.value());
+ }
+
+ @Test
+ public void keyBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+ List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
+ Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+ }
+
+ @Test
+ public void mapOnPairRDD() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+ JavaPairRDD<Integer, Integer> rdd2 =
+ rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+ JavaPairRDD<Integer, Integer> rdd3 =
+ rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
+ Assert.assertEquals(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(0, 2),
+ new Tuple2<Integer, Integer>(1, 3),
+ new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+ }
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 =
+ rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+ List[] parts = rdd1.collectPartitions(new int[]{0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(2, 0)),
+ rdd2.collectPartitions(new int[]{0})[0]);
+
+ parts = rdd2.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+ new Tuple2<Integer, Integer>(4, 0)), parts[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+ new Tuple2<Integer, Integer>(6, 0),
+ new Tuple2<Integer, Integer>(7, 1)), parts[1]);
+ }
+
+ @Test
+ public void collectAsMapWithIntArrayValues() {
+ // Regression test for SPARK-1040
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
+ JavaPairRDD<Integer, int[]> pairRDD =
+ rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
+ pairRDD.collect(); // Works fine
+ Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ }
+}
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
new file mode 100644
index 0000000000..43df0dea61
--- /dev/null
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+/**
+ * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
+
+ @Test
+ public void testMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("goodnight", "moon"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(5, 5),
+ Arrays.asList(9, 4));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(s -> s.length());
+ JavaTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("yankees"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testMapPartitions() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("GIANTSDODGERS"),
+ Arrays.asList("YANKEESRED SOCKS"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> mapped = stream.mapPartitions(in -> {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ });
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduce() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(15),
+ Arrays.asList(24));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(21),
+ Arrays.asList(39),
+ Arrays.asList(24));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
+ (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reducedWindowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testTransform() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3, 4, 5),
+ Arrays.asList(6, 7, 8),
+ Arrays.asList(9, 10, 11));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
+
+ JavaTestUtils.attachTestOutputStream(transformed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testVariousTransform() {
+ // tests whether all variations of transform can be called from Java
+
+ List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+ JavaDStream<Integer> transformed1 = stream.transform(in -> null);
+ JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
+ JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
+ JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
+ JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
+ JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
+ JavaPairDStream<String, String> pairTransformed4 =
+ pairStream.transformToPair((x, time) -> null);
+
+ }
+
+ @Test
+ public void testTransformWith() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(
+ new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Sets.newHashSet(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Sets.newHashSet(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined =
+ pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
+
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
+ unorderedResult.add(Sets.newHashSet(res));
+ }
+
+ Assert.assertEquals(expected, unorderedResult);
+ }
+
+
+ @Test
+ public void testVariousTransformWith() {
+ // tests whether all variations of transformWith can be called from Java
+
+ List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+ List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+ JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData1 =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ List<List<Tuple2<Double, Character>>> pairInputData2 =
+ Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+ JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+ JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+ JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
+ JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> transformed3 =
+ stream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> transformed4 =
+ stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
+
+ JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
+
+ JavaDStream<Double> pairTransformed2_ =
+ pairStream1.transformWith(pairStream1,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> pairTransformed3 =
+ pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> pairTransformed4 =
+ pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
+ }
+
+ @Test
+ public void testStreamingContextTransform() {
+ List<List<Integer>> stream1input = Arrays.asList(
+ Arrays.asList(1),
+ Arrays.asList(2)
+ );
+
+ List<List<Integer>> stream2input = Arrays.asList(
+ Arrays.asList(3),
+ Arrays.asList(4)
+ );
+
+ List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
+ Arrays.asList(new Tuple2<Integer, String>(1, "x")),
+ Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+ );
+
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
+ Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+ );
+
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
+ JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
+ JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
+
+ List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+
+ // This is just to test whether this transform to JavaStream compiles
+ JavaDStream<Long> transformed1 = ssc.transform(
+ listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+ assert (listOfRDDs.size() == 2);
+ return null;
+ });
+
+ List<JavaDStream<?>> listOfDStreams2 =
+ Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+
+ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
+ listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+ assert (listOfRDDs.size() == 3);
+ JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
+ JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
+ JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
+ JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+ PairFunction<Integer, Integer, Integer> mapToTuple =
+ (Integer i) -> new Tuple2<Integer, Integer>(i, i);
+ return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
+ });
+ JavaTestUtils.attachTestOutputStream(transformed2);
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
+ JavaTestUtils.runStreams(ssc, 2, 2);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("go", "giants"),
+ Arrays.asList("boo", "dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
+ Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
+ Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testPairFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(6, "g"),
+ new Tuple2<Integer, String>(6, "i"),
+ new Tuple2<Integer, String>(6, "a"),
+ new Tuple2<Integer, String>(6, "n"),
+ new Tuple2<Integer, String>(6, "t"),
+ new Tuple2<Integer, String>(6, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "o"),
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "g"),
+ new Tuple2<Integer, String>(7, "e"),
+ new Tuple2<Integer, String>(7, "r"),
+ new Tuple2<Integer, String>(7, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(9, "a"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "h"),
+ new Tuple2<Integer, String>(9, "l"),
+ new Tuple2<Integer, String>(9, "e"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "i"),
+ new Tuple2<Integer, String>(9, "c"),
+ new Tuple2<Integer, String>(9, "s")));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter : s.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(s.length(), letter));
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ /*
+ * Performs an order-invariant comparison of lists representing two RDD streams. This allows
+ * us to account for ordering variation within individual RDD's which occurs during windowing.
+ */
+ public static <T extends Comparable<T>> void assertOrderInvariantEquals(
+ List<List<T>> expected, List<List<T>> actual) {
+ for (List<T> list : expected) {
+ Collections.sort(list);
+ }
+ for (List<T> list : actual) {
+ Collections.sort(list);
+ }
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testPairFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
+ Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream =
+ stream.mapToPair(x -> new Tuple2<>(x, x.length()));
+ JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "yankees"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "rangers"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+ List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 1),
+ new Tuple2<String, Integer>("california", 3),
+ new Tuple2<String, Integer>("new york", 4),
+ new Tuple2<String, Integer>("new york", 1)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("new york", 3),
+ new Tuple2<String, Integer>("new york", 1)));
+
+ @Test
+ public void testPairMap() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMapPartitions() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
+ LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ while (in.hasNext()) {
+ Tuple2<String, Integer> next = in.next();
+ out.add(next.swap());
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMap2() { // Maps pair -> single
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1, 3, 4, 1),
+ Arrays.asList(5, 5, 3, 1));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+ List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
+ List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ for (Character s : in._1().toCharArray()) {
+ out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairReduceByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
+
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCombineByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
+ (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
+
+ JavaTestUtils.attachTestOutputStream(combined);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindow() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testUpdateStateByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
+ });
+
+ JavaTestUtils.attachTestOutputStream(updated);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindowWithInverse() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
+ new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
+
+ JavaTestUtils.attachTestOutputStream(sorted);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToNormalRDDTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3, 1, 4, 2),
+ Arrays.asList(2, 3, 4, 1));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
+ JavaTestUtils.attachTestOutputStream(firstParts);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
+ new Tuple2<String, String>("california", "GIANTS"),
+ new Tuple2<String, String>("new york", "YANKEES"),
+ new Tuple2<String, String>("new york", "METS")),
+ Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
+ new Tuple2<String, String>("california", "DUCKS"),
+ new Tuple2<String, String>("new york", "RANGERS"),
+ new Tuple2<String, String>("new york", "ISLANDERS")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase());
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
+ new Tuple2<String, String>("california", "dodgers2"),
+ new Tuple2<String, String>("california", "giants1"),
+ new Tuple2<String, String>("california", "giants2"),
+ new Tuple2<String, String>("new york", "yankees1"),
+ new Tuple2<String, String>("new york", "yankees2"),
+ new Tuple2<String, String>("new york", "mets1"),
+ new Tuple2<String, String>("new york", "mets2")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
+ new Tuple2<String, String>("california", "sharks2"),
+ new Tuple2<String, String>("california", "ducks1"),
+ new Tuple2<String, String>("california", "ducks2"),
+ new Tuple2<String, String>("new york", "rangers1"),
+ new Tuple2<String, String>("new york", "rangers2"),
+ new Tuple2<String, String>("new york", "islanders1"),
+ new Tuple2<String, String>("new york", "islanders2")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+
+ JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
+ List<String> out = new ArrayList<String>();
+ out.add(in + "1");
+ out.add(in + "2");
+ return out;
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ Assert.assertEquals(expected, result);
+ }
+
+}
diff --git a/extras/java8-tests/src/test/resources/log4j.properties b/extras/java8-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..180beaa8cc
--- /dev/null
+++ b/extras/java8-tests/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN