aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-19 09:42:50 -0800
committerSean Owen <sowen@cloudera.com>2017-02-19 09:42:50 -0800
commit1487c9af20a333ead55955acf4c0aa323bea0d07 (patch)
tree5f47daa77e0f73da1e009cc3dcf0a5c0073246aa /core/src
parentde14d35f77071932963a994fac5aec0e5df838a1 (diff)
downloadspark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.gz
spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.bz2
spark-1487c9af20a333ead55955acf4c0aa323bea0d07.zip
[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request? Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code. ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #16964 from srowen/SPARK-19534.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java1
-rw-r--r--core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java26
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java65
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java25
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java25
-rw-r--r--core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java7
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaAPISuite.java492
7 files changed, 144 insertions, 497 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 189d607fa6..29aca04a3d 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -37,7 +37,6 @@ import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.util.TaskCompletionListener;
import org.apache.spark.util.Utils;
/**
diff --git a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
index 7fe452a48d..a6589d2898 100644
--- a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
+++ b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
@@ -20,14 +20,11 @@ import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
import org.apache.spark.rdd.JdbcRDD;
import org.junit.After;
import org.junit.Assert;
@@ -89,30 +86,13 @@ public class JavaJdbcRDDSuite implements Serializable {
public void testJavaJdbcRDD() throws Exception {
JavaRDD<Integer> rdd = JdbcRDD.create(
sc,
- new JdbcRDD.ConnectionFactory() {
- @Override
- public Connection getConnection() throws SQLException {
- return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
- }
- },
+ () -> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"),
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 100, 1,
- new Function<ResultSet, Integer>() {
- @Override
- public Integer call(ResultSet r) throws Exception {
- return r.getInt(1);
- }
- }
+ r -> r.getInt(1)
).cache();
Assert.assertEquals(100, rdd.count());
- Assert.assertEquals(
- Integer.valueOf(10100),
- rdd.reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- }));
+ Assert.assertEquals(Integer.valueOf(10100), rdd.reduce((i1, i2) -> i1 + i2));
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 088b68132d..24a55df84a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -34,8 +34,6 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.ShuffleDependency;
@@ -119,9 +117,7 @@ public class UnsafeShuffleWriterSuite {
any(File.class),
any(SerializerInstance.class),
anyInt(),
- any(ShuffleWriteMetrics.class))).thenAnswer(new Answer<DiskBlockObjectWriter>() {
- @Override
- public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable {
+ any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
return new DiskBlockObjectWriter(
(File) args[1],
@@ -132,33 +128,24 @@ public class UnsafeShuffleWriterSuite {
(ShuffleWriteMetrics) args[4],
(BlockId) args[0]
);
- }
- });
+ });
when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
- File tmp = (File) invocationOnMock.getArguments()[3];
- mergedOutputFile.delete();
- tmp.renameTo(mergedOutputFile);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
+ File tmp = (File) invocationOnMock.getArguments()[3];
+ mergedOutputFile.delete();
+ tmp.renameTo(mergedOutputFile);
+ return null;
}).when(shuffleBlockResolver)
.writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), any(File.class));
- when(diskBlockManager.createTempShuffleBlock()).thenAnswer(
- new Answer<Tuple2<TempShuffleBlockId, File>>() {
- @Override
- public Tuple2<TempShuffleBlockId, File> answer(
- InvocationOnMock invocationOnMock) throws Throwable {
- TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
- File file = File.createTempFile("spillFile", ".spill", tempDir);
- spillFilesCreated.add(file);
- return Tuple2$.MODULE$.apply(blockId, file);
- }
- });
+ when(diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock -> {
+ TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
+ File file = File.createTempFile("spillFile", ".spill", tempDir);
+ spillFilesCreated.add(file);
+ return Tuple2$.MODULE$.apply(blockId, file);
+ });
when(taskContext.taskMetrics()).thenReturn(taskMetrics);
when(shuffleDep.serializer()).thenReturn(serializer);
@@ -243,7 +230,7 @@ public class UnsafeShuffleWriterSuite {
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
- writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
+ writer.write(Iterators.emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
@@ -259,7 +246,7 @@ public class UnsafeShuffleWriterSuite {
// In this example, each partition should have exactly one record:
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < NUM_PARTITITONS; i++) {
- dataToWrite.add(new Tuple2<Object, Object>(i, i));
+ dataToWrite.add(new Tuple2<>(i, i));
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(dataToWrite.iterator());
@@ -315,7 +302,7 @@ public class UnsafeShuffleWriterSuite {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(transferToEnabled);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i : new int[] { 1, 2, 3, 4, 4, 2 }) {
- dataToWrite.add(new Tuple2<Object, Object>(i, i));
+ dataToWrite.add(new Tuple2<>(i, i));
}
writer.insertRecordIntoSorter(dataToWrite.get(0));
writer.insertRecordIntoSorter(dataToWrite.get(1));
@@ -424,7 +411,7 @@ public class UnsafeShuffleWriterSuite {
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
for (int i = 0; i < 10 + 1; i++) {
- dataToWrite.add(new Tuple2<Object, Object>(i, bigByteArray));
+ dataToWrite.add(new Tuple2<>(i, bigByteArray));
}
writer.write(dataToWrite.iterator());
assertEquals(2, spillFilesCreated.size());
@@ -458,7 +445,7 @@ public class UnsafeShuffleWriterSuite {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) {
- dataToWrite.add(new Tuple2<Object, Object>(i, i));
+ dataToWrite.add(new Tuple2<>(i, i));
}
writer.write(dataToWrite.iterator());
writer.stop(true);
@@ -478,7 +465,7 @@ public class UnsafeShuffleWriterSuite {
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
final byte[] bytes = new byte[(int) (ShuffleExternalSorter.DISK_WRITE_BUFFER_SIZE * 2.5)];
new Random(42).nextBytes(bytes);
- dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(bytes)));
+ dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(bytes)));
writer.write(dataToWrite.iterator());
writer.stop(true);
assertEquals(
@@ -491,15 +478,15 @@ public class UnsafeShuffleWriterSuite {
public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
- dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(new byte[1])));
+ dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(new byte[1])));
// We should be able to write a record that's right _at_ the max record size
final byte[] atMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes() - 4];
new Random(42).nextBytes(atMaxRecordSize);
- dataToWrite.add(new Tuple2<Object, Object>(2, ByteBuffer.wrap(atMaxRecordSize)));
+ dataToWrite.add(new Tuple2<>(2, ByteBuffer.wrap(atMaxRecordSize)));
// Inserting a record that's larger than the max record size
final byte[] exceedsMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes()];
new Random(42).nextBytes(exceedsMaxRecordSize);
- dataToWrite.add(new Tuple2<Object, Object>(3, ByteBuffer.wrap(exceedsMaxRecordSize)));
+ dataToWrite.add(new Tuple2<>(3, ByteBuffer.wrap(exceedsMaxRecordSize)));
writer.write(dataToWrite.iterator());
writer.stop(true);
assertEquals(
@@ -511,10 +498,10 @@ public class UnsafeShuffleWriterSuite {
@Test
public void spillFilesAreDeletedWhenStoppingAfterError() throws IOException {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
- writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
- writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
+ writer.insertRecordIntoSorter(new Tuple2<>(1, 1));
+ writer.insertRecordIntoSorter(new Tuple2<>(2, 2));
writer.forceSorterToSpill();
- writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
+ writer.insertRecordIntoSorter(new Tuple2<>(2, 2));
writer.stop(false);
assertSpillFilesWereCleanedUp();
}
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 26568146bf..03cec8ed81 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import scala.Tuple2;
import scala.Tuple2$;
import org.junit.After;
@@ -31,8 +30,6 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.apache.spark.SparkConf;
import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -88,25 +85,18 @@ public abstract class AbstractBytesToBytesMapSuite {
spillFilesCreated.clear();
MockitoAnnotations.initMocks(this);
when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
- when(diskBlockManager.createTempLocalBlock()).thenAnswer(
- new Answer<Tuple2<TempLocalBlockId, File>>() {
- @Override
- public Tuple2<TempLocalBlockId, File> answer(InvocationOnMock invocationOnMock)
- throws Throwable {
- TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
- File file = File.createTempFile("spillFile", ".spill", tempDir);
- spillFilesCreated.add(file);
- return Tuple2$.MODULE$.apply(blockId, file);
- }
+ when(diskBlockManager.createTempLocalBlock()).thenAnswer(invocationOnMock -> {
+ TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
+ File file = File.createTempFile("spillFile", ".spill", tempDir);
+ spillFilesCreated.add(file);
+ return Tuple2$.MODULE$.apply(blockId, file);
});
when(blockManager.getDiskWriter(
any(BlockId.class),
any(File.class),
any(SerializerInstance.class),
anyInt(),
- any(ShuffleWriteMetrics.class))).thenAnswer(new Answer<DiskBlockObjectWriter>() {
- @Override
- public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable {
+ any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
return new DiskBlockObjectWriter(
@@ -118,8 +108,7 @@ public abstract class AbstractBytesToBytesMapSuite {
(ShuffleWriteMetrics) args[4],
(BlockId) args[0]
);
- }
- });
+ });
}
@After
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index fbbe530a13..771d39016c 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.UUID;
-import scala.Tuple2;
import scala.Tuple2$;
import org.junit.After;
@@ -31,8 +30,6 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
@@ -96,25 +93,18 @@ public class UnsafeExternalSorterSuite {
taskContext = mock(TaskContext.class);
when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
- when(diskBlockManager.createTempLocalBlock()).thenAnswer(
- new Answer<Tuple2<TempLocalBlockId, File>>() {
- @Override
- public Tuple2<TempLocalBlockId, File> answer(InvocationOnMock invocationOnMock)
- throws Throwable {
- TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
- File file = File.createTempFile("spillFile", ".spill", tempDir);
- spillFilesCreated.add(file);
- return Tuple2$.MODULE$.apply(blockId, file);
- }
+ when(diskBlockManager.createTempLocalBlock()).thenAnswer(invocationOnMock -> {
+ TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
+ File file = File.createTempFile("spillFile", ".spill", tempDir);
+ spillFilesCreated.add(file);
+ return Tuple2$.MODULE$.apply(blockId, file);
});
when(blockManager.getDiskWriter(
any(BlockId.class),
any(File.class),
any(SerializerInstance.class),
anyInt(),
- any(ShuffleWriteMetrics.class))).thenAnswer(new Answer<DiskBlockObjectWriter>() {
- @Override
- public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable {
+ any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
return new DiskBlockObjectWriter(
@@ -126,8 +116,7 @@ public class UnsafeExternalSorterSuite {
(ShuffleWriteMetrics) args[4],
(BlockId) args[0]
);
- }
- });
+ });
}
@After
diff --git a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
index e22ad89c1d..1d2b05ebc2 100644
--- a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
@@ -64,12 +64,7 @@ public class Java8RDDAPISuite implements Serializable {
public void foreachWithAnonymousClass() {
foreachCalls = 0;
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String s) {
- foreachCalls++;
- }
- });
+ rdd.foreach(s -> foreachCalls++);
Assert.assertEquals(2, foreachCalls);
}
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index 80aab100ac..512149127d 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -31,7 +31,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.*;
import org.apache.spark.Accumulator;
@@ -208,7 +207,7 @@ public class JavaAPISuite implements Serializable {
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
// Custom comparator
- sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false);
+ sortedRDD = rdd.sortByKey(Collections.reverseOrder(), false);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
sortedPairs = sortedRDD.collect();
assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1));
@@ -266,13 +265,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs);
// compare on first value
- JavaRDD<Tuple2<Integer, Integer>> sortedRDD =
- rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<Integer, Integer> t) {
- return t._1();
- }
- }, true, 2);
+ JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect();
@@ -280,12 +273,7 @@ public class JavaAPISuite implements Serializable {
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2));
// compare on second value
- sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<Integer, Integer> t) {
- return t._2();
- }
- }, true, 2);
+ sortedRDD = rdd.sortBy(Tuple2::_2, true, 2);
assertEquals(new Tuple2<>(-1, 1), sortedRDD.first());
sortedPairs = sortedRDD.collect();
assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1));
@@ -294,28 +282,20 @@ public class JavaAPISuite implements Serializable {
@Test
public void foreach() {
- final LongAccumulator accum = sc.sc().longAccumulator();
+ LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String s) {
- accum.add(1);
- }
- });
+ rdd.foreach(s -> accum.add(1));
assertEquals(2, accum.value().intValue());
}
@Test
public void foreachPartition() {
- final LongAccumulator accum = sc.sc().longAccumulator();
+ LongAccumulator accum = sc.sc().longAccumulator();
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
- @Override
- public void call(Iterator<String> iter) {
- while (iter.hasNext()) {
- iter.next();
- accum.add(1);
- }
+ rdd.foreachPartition(iter -> {
+ while (iter.hasNext()) {
+ iter.next();
+ accum.add(1);
}
});
assertEquals(2, accum.value().intValue());
@@ -361,12 +341,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void groupBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
- @Override
- public Boolean call(Integer x) {
- return x % 2 == 0;
- }
- };
+ Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
assertEquals(2, oddsAndEvens.count());
assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
@@ -383,12 +358,7 @@ public class JavaAPISuite implements Serializable {
// Regression test for SPARK-4459
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Tuple2<Integer, Integer>, Boolean> areOdd =
- new Function<Tuple2<Integer, Integer>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Integer, Integer> x) {
- return (x._1() % 2 == 0) && (x._2() % 2 == 0);
- }
- };
+ x -> (x._1() % 2 == 0) && (x._2() % 2 == 0);
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
assertEquals(2, oddsAndEvens.count());
@@ -406,13 +376,7 @@ public class JavaAPISuite implements Serializable {
public void keyByOnPairRDD() {
// Regression test for SPARK-4459
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function<Tuple2<Integer, Integer>, String> sumToString =
- new Function<Tuple2<Integer, Integer>, String>() {
- @Override
- public String call(Tuple2<Integer, Integer> x) {
- return String.valueOf(x._1() + x._2());
- }
- };
+ Function<Tuple2<Integer, Integer>, String> sumToString = x -> String.valueOf(x._1() + x._2());
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
assertEquals(7, keyed.count());
@@ -516,25 +480,14 @@ public class JavaAPISuite implements Serializable {
rdd1.leftOuterJoin(rdd2).collect();
assertEquals(5, joined.size());
Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
- rdd1.leftOuterJoin(rdd2).filter(
- new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) {
- return !tup._2()._2().isPresent();
- }
- }).first();
+ rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
assertEquals(3, firstUnmatched._1().intValue());
}
@Test
public void foldReduce() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- };
+ Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
int sum = rdd.fold(0, add);
assertEquals(33, sum);
@@ -546,12 +499,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void treeReduce() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
- Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- };
+ Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
for (int depth = 1; depth <= 10; depth++) {
int sum = rdd.treeReduce(add, depth);
assertEquals(-5, sum);
@@ -561,12 +509,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void treeAggregate() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
- Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- };
+ Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
for (int depth = 1; depth <= 10; depth++) {
int sum = rdd.treeAggregate(0, add, add, depth);
assertEquals(-5, sum);
@@ -584,21 +527,15 @@ public class JavaAPISuite implements Serializable {
new Tuple2<>(5, 1),
new Tuple2<>(5, 3)), 2);
- Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
- new Function2<Set<Integer>, Integer, Set<Integer>>() {
- @Override
- public Set<Integer> call(Set<Integer> a, Integer b) {
- a.add(b);
- return a;
- }
- },
- new Function2<Set<Integer>, Set<Integer>, Set<Integer>>() {
- @Override
- public Set<Integer> call(Set<Integer> a, Set<Integer> b) {
- a.addAll(b);
- return a;
- }
- }).collectAsMap();
+ Map<Integer, HashSet<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
+ (a, b) -> {
+ a.add(b);
+ return a;
+ },
+ (a, b) -> {
+ a.addAll(b);
+ return a;
+ }).collectAsMap();
assertEquals(3, sets.size());
assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
@@ -616,13 +553,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0,
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
+ JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
assertEquals(1, sums.lookup(1).get(0).intValue());
assertEquals(2, sums.lookup(2).get(0).intValue());
assertEquals(3, sums.lookup(3).get(0).intValue());
@@ -639,13 +570,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
+ JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
assertEquals(1, counts.lookup(1).get(0).intValue());
assertEquals(2, counts.lookup(2).get(0).intValue());
assertEquals(3, counts.lookup(3).get(0).intValue());
@@ -655,12 +580,7 @@ public class JavaAPISuite implements Serializable {
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
- localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer a, Integer b) {
- return a + b;
- }
- });
+ localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
assertEquals(1, localCounts.get(1).intValue());
assertEquals(2, localCounts.get(2).intValue());
assertEquals(3, localCounts.get(3).intValue());
@@ -692,20 +612,8 @@ public class JavaAPISuite implements Serializable {
assertTrue(sc.emptyRDD().isEmpty());
assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
- assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
- new Function<Integer,Boolean>() {
- @Override
- public Boolean call(Integer i) {
- return i < 0;
- }
- }).isEmpty());
- assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
- new Function<Integer, Boolean>() {
- @Override
- public Boolean call(Integer i) {
- return i > 1;
- }
- }).isEmpty());
+ assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(i -> i < 0).isEmpty());
+ assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(i -> i > 1).isEmpty());
}
@Test
@@ -721,12 +629,7 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
JavaDoubleRDD distinct = rdd.distinct();
assertEquals(5, distinct.count());
- JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
- @Override
- public Boolean call(Double x) {
- return x > 2.0;
- }
- });
+ JavaDoubleRDD filter = rdd.filter(x -> x > 2.0);
assertEquals(3, filter.count());
JavaDoubleRDD union = rdd.union(rdd);
assertEquals(12, union.count());
@@ -764,7 +667,7 @@ public class JavaAPISuite implements Serializable {
// SPARK-5744
assertArrayEquals(
new long[] {0},
- sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0}));
+ sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0}));
}
private static class DoubleComparator implements Comparator<Double>, Serializable {
@@ -833,12 +736,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void reduceOnJavaDoubleRDD() {
JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
- double sum = rdd.reduce(new Function2<Double, Double, Double>() {
- @Override
- public Double call(Double v1, Double v2) {
- return v1 + v2;
- }
- });
+ double sum = rdd.reduce((v1, v2) -> v1 + v2);
assertEquals(10.0, sum, 0.001);
}
@@ -859,27 +757,11 @@ public class JavaAPISuite implements Serializable {
@Test
public void map() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
- @Override
- public double call(Integer x) {
- return x.doubleValue();
- }
- }).cache();
+ JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue).cache();
doubles.collect();
- JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer x) {
- return new Tuple2<>(x, x);
- }
- }).cache();
+ JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)).cache();
pairs.collect();
- JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
- @Override
- public String call(Integer x) {
- return x.toString();
- }
- }).cache();
+ JavaRDD<String> strings = rdd.map(Object::toString).cache();
strings.collect();
}
@@ -887,39 +769,27 @@ public class JavaAPISuite implements Serializable {
public void flatMap() {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
"The quick brown fox jumps over the lazy dog."));
- JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- });
+ JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
assertEquals("Hello", words.first());
assertEquals(11, words.count());
- JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
- new PairFlatMapFunction<String, String, String>() {
- @Override
- public Iterator<Tuple2<String, String>> call(String s) {
- List<Tuple2<String, String>> pairs = new LinkedList<>();
- for (String word : s.split(" ")) {
- pairs.add(new Tuple2<>(word, word));
- }
- return pairs.iterator();
+ JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(s -> {
+ List<Tuple2<String, String>> pairs = new LinkedList<>();
+ for (String word : s.split(" ")) {
+ pairs.add(new Tuple2<>(word, word));
}
+ return pairs.iterator();
}
);
assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first());
assertEquals(11, pairsRDD.count());
- JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
- @Override
- public Iterator<Double> call(String s) {
- List<Double> lengths = new LinkedList<>();
- for (String word : s.split(" ")) {
- lengths.add((double) word.length());
- }
- return lengths.iterator();
+ JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
+ List<Double> lengths = new LinkedList<>();
+ for (String word : s.split(" ")) {
+ lengths.add((double) word.length());
}
+ return lengths.iterator();
});
assertEquals(5.0, doubles.first(), 0.01);
assertEquals(11, pairsRDD.count());
@@ -937,37 +807,23 @@ public class JavaAPISuite implements Serializable {
// Regression test for SPARK-668:
JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
- new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
- return Collections.singletonList(item.swap()).iterator();
- }
- });
+ item -> Collections.singletonList(item.swap()).iterator());
swapped.collect();
// There was never a bug here, but it's worth testing:
- pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<Integer, String> item) {
- return item.swap();
- }
- }).collect();
+ pairRDD.mapToPair(Tuple2::swap).collect();
}
@Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- JavaRDD<Integer> partitionSums = rdd.mapPartitions(
- new FlatMapFunction<Iterator<Integer>, Integer>() {
- @Override
- public Iterator<Integer> call(Iterator<Integer> iter) {
- int sum = 0;
- while (iter.hasNext()) {
- sum += iter.next();
- }
- return Collections.singletonList(sum).iterator();
+ JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next();
}
- });
+ return Collections.singletonList(sum).iterator();
+ });
assertEquals("[3, 7]", partitionSums.collect().toString());
}
@@ -975,17 +831,13 @@ public class JavaAPISuite implements Serializable {
@Test
public void mapPartitionsWithIndex() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex(
- new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
- @Override
- public Iterator<Integer> call(Integer index, Iterator<Integer> iter) {
- int sum = 0;
- while (iter.hasNext()) {
- sum += iter.next();
- }
- return Collections.singletonList(sum).iterator();
+ JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex((index, iter) -> {
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next();
}
- }, false);
+ return Collections.singletonList(sum).iterator();
+ }, false);
assertEquals("[3, 7]", partitionSums.collect().toString());
}
@@ -1124,21 +976,12 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+ rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
// Try reading the output back as an object file
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
- Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
- @Override
- public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
- return new Tuple2<>(pair._1().get(), pair._2().toString());
- }
- });
+ Text.class).mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
assertEquals(pairs, readRDD.collect());
}
@@ -1179,12 +1022,7 @@ public class JavaAPISuite implements Serializable {
channel1.close();
JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache();
- readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
- @Override
- public void call(Tuple2<String, PortableDataStream> pair) {
- pair._2().toArray(); // force the file to read
- }
- });
+ readRDD.foreach(pair -> pair._2().toArray()); // force the file to read
List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
for (Tuple2<String, PortableDataStream> res : result) {
@@ -1229,23 +1067,13 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsNewAPIHadoopFile(
- outputDir, IntWritable.class, Text.class,
+ rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output =
sc.sequenceFile(outputDir, IntWritable.class, Text.class);
- assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
+ assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
@SuppressWarnings("unchecked")
@@ -1259,22 +1087,13 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+ rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
IntWritable.class, Text.class, Job.getInstance().getConfiguration());
- assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
+ assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
@Test
@@ -1315,21 +1134,12 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+ rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
SequenceFileInputFormat.class, IntWritable.class, Text.class);
- assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
+ assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
@SuppressWarnings("unchecked")
@@ -1343,34 +1153,19 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
- @Override
- public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
- return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()));
- }
- }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
- DefaultCodec.class);
+ rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, DefaultCodec.class);
JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
SequenceFileInputFormat.class, IntWritable.class, Text.class);
- assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
- @Override
- public String call(Tuple2<IntWritable, Text> x) {
- return x.toString();
- }
- }).collect().toString());
+ assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
@Test
public void zip() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
- @Override
- public double call(Integer x) {
- return x.doubleValue();
- }
- });
+ JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue);
JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
zipped.count();
}
@@ -1380,12 +1175,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
- new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
- @Override
- public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
- return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
- }
- };
+ (i, s) -> Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
@@ -1396,22 +1186,12 @@ public class JavaAPISuite implements Serializable {
public void accumulators() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- final Accumulator<Integer> intAccum = sc.intAccumulator(10);
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer x) {
- intAccum.add(x);
- }
- });
+ Accumulator<Integer> intAccum = sc.intAccumulator(10);
+ rdd.foreach(intAccum::add);
assertEquals((Integer) 25, intAccum.value());
- final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer x) {
- doubleAccum.add((double) x);
- }
- });
+ Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+ rdd.foreach(x -> doubleAccum.add((double) x));
assertEquals((Double) 25.0, doubleAccum.value());
// Try a custom accumulator type
@@ -1432,13 +1212,8 @@ public class JavaAPISuite implements Serializable {
}
};
- final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer x) {
- floatAccum.add((float) x);
- }
- });
+ Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
+ rdd.foreach(x -> floatAccum.add((float) x));
assertEquals((Float) 25.0f, floatAccum.value());
// Test the setValue method
@@ -1449,12 +1224,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void keyBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
- List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
- @Override
- public String call(Integer t) {
- return t.toString();
- }
- }).collect();
+ List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
assertEquals(new Tuple2<>("1", 1), s.get(0));
assertEquals(new Tuple2<>("2", 2), s.get(1));
}
@@ -1487,26 +1257,10 @@ public class JavaAPISuite implements Serializable {
@Test
public void combineByKey() {
JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
- Function<Integer, Integer> keyFunction = new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer v1) {
- return v1 % 3;
- }
- };
- Function<Integer, Integer> createCombinerFunction = new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer v1) {
- return v1;
- }
- };
+ Function<Integer, Integer> keyFunction = v1 -> v1 % 3;
+ Function<Integer, Integer> createCombinerFunction = v1 -> v1;
- Function2<Integer, Integer, Integer> mergeValueFunction =
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) {
- return v1 + v2;
- }
- };
+ Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2;
JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
.combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
@@ -1534,20 +1288,8 @@ public class JavaAPISuite implements Serializable {
@Test
public void mapOnPairRDD() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i, i % 2);
- }
- });
- JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
- new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) {
- return new Tuple2<>(in._2(), in._1());
- }
- });
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
+ JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
assertEquals(Arrays.asList(
new Tuple2<>(1, 1),
new Tuple2<>(0, 2),
@@ -1561,13 +1303,7 @@ public class JavaAPISuite implements Serializable {
public void collectPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i, i % 2);
- }
- });
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
List<Integer>[] parts = rdd1.collectPartitions(new int[] {0});
assertEquals(Arrays.asList(1, 2), parts[0]);
@@ -1623,13 +1359,7 @@ public class JavaAPISuite implements Serializable {
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
- JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(
- new PairFunction<Integer, Integer, int[]>() {
- @Override
- public Tuple2<Integer, int[]> call(Integer x) {
- return new Tuple2<>(x, new int[]{x});
- }
- });
+ JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
pairRDD.collect(); // Works fine
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
@@ -1651,13 +1381,7 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
public void sampleByKey() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i % 2, 1);
- }
- });
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1));
Map<Integer, Double> fractions = new HashMap<>();
fractions.put(0, 0.5);
fractions.put(1, 1.0);
@@ -1677,13 +1401,7 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
public void sampleByKeyExact() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i % 2, 1);
- }
- });
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1));
Map<Integer, Double> fractions = new HashMap<>();
fractions.put(0, 0.5);
fractions.put(1, 1.0);
@@ -1754,14 +1472,7 @@ public class JavaAPISuite implements Serializable {
public void foreachAsync() throws Exception {
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<Void> future = rdd.foreachAsync(
- new VoidFunction<Integer>() {
- @Override
- public void call(Integer integer) {
- // intentionally left blank.
- }
- }
- );
+ JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {});
future.get();
assertFalse(future.isCancelled());
assertTrue(future.isDone());
@@ -1784,11 +1495,8 @@ public class JavaAPISuite implements Serializable {
public void testAsyncActionCancellation() throws Exception {
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() {
- @Override
- public void call(Integer integer) throws InterruptedException {
- Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled.
- }
+ JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {
+ Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled.
});
future.cancel(true);
assertTrue(future.isCancelled());
@@ -1805,7 +1513,7 @@ public class JavaAPISuite implements Serializable {
public void testAsyncActionErrorWrapping() throws Exception {
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(data, 1);
- JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync();
+ JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<>()).countAsync();
try {
future.get(2, TimeUnit.SECONDS);
fail("Expected future.get() for failed job to throw ExcecutionException");