From 1487c9af20a333ead55955acf4c0aa323bea0d07 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 19 Feb 2017 09:42:50 -0800 Subject: [SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features ## What changes were proposed in this pull request? Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code. ## How was this patch tested? Jenkins tests Author: Sean Owen Closes #16964 from srowen/SPARK-19534. --- .../unsafe/sort/UnsafeExternalSorter.java | 1 - .../java/org/apache/spark/JavaJdbcRDDSuite.java | 26 +- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 65 ++- .../unsafe/map/AbstractBytesToBytesMapSuite.java | 25 +- .../unsafe/sort/UnsafeExternalSorterSuite.java | 25 +- .../test/org/apache/spark/Java8RDDAPISuite.java | 7 +- .../java/test/org/apache/spark/JavaAPISuite.java | 492 +++++---------------- 7 files changed, 144 insertions(+), 497 deletions(-) (limited to 'core/src') diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 189d607fa6..29aca04a3d 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -37,7 +37,6 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.UnsafeAlignedOffset; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.util.TaskCompletionListener; import org.apache.spark.util.Utils; /** diff --git a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java index 7fe452a48d..a6589d2898 100644 --- a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java +++ b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java @@ -20,14 +20,11 @@ import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.rdd.JdbcRDD; import org.junit.After; import org.junit.Assert; @@ -89,30 +86,13 @@ public class JavaJdbcRDDSuite implements Serializable { public void testJavaJdbcRDD() throws Exception { JavaRDD rdd = JdbcRDD.create( sc, - new JdbcRDD.ConnectionFactory() { - @Override - public Connection getConnection() throws SQLException { - return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); - } - }, + () -> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"), "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 100, 1, - new Function() { - @Override - public Integer call(ResultSet r) throws Exception { - return r.getInt(1); - } - } + r -> r.getInt(1) ).cache(); Assert.assertEquals(100, rdd.count()); - Assert.assertEquals( - Integer.valueOf(10100), - rdd.reduce(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - })); + Assert.assertEquals(Integer.valueOf(10100), rdd.reduce((i1, i2) -> i1 + i2)); } } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 088b68132d..24a55df84a 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -34,8 +34,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.apache.spark.HashPartitioner; import org.apache.spark.ShuffleDependency; @@ -119,9 +117,7 @@ public class UnsafeShuffleWriterSuite { any(File.class), any(SerializerInstance.class), anyInt(), - any(ShuffleWriteMetrics.class))).thenAnswer(new Answer() { - @Override - public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable { + any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( (File) args[1], @@ -132,33 +128,24 @@ public class UnsafeShuffleWriterSuite { (ShuffleWriteMetrics) args[4], (BlockId) args[0] ); - } - }); + }); when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; - File tmp = (File) invocationOnMock.getArguments()[3]; - mergedOutputFile.delete(); - tmp.renameTo(mergedOutputFile); - return null; - } + doAnswer(invocationOnMock -> { + partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; + File tmp = (File) invocationOnMock.getArguments()[3]; + mergedOutputFile.delete(); + tmp.renameTo(mergedOutputFile); + return null; }).when(shuffleBlockResolver) .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), any(File.class)); - when(diskBlockManager.createTempShuffleBlock()).thenAnswer( - new Answer>() { - @Override - public Tuple2 answer( - InvocationOnMock invocationOnMock) throws Throwable { - TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID()); - File file = File.createTempFile("spillFile", ".spill", tempDir); - spillFilesCreated.add(file); - return Tuple2$.MODULE$.apply(blockId, file); - } - }); + when(diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock -> { + TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID()); + File file = File.createTempFile("spillFile", ".spill", tempDir); + spillFilesCreated.add(file); + return Tuple2$.MODULE$.apply(blockId, file); + }); when(taskContext.taskMetrics()).thenReturn(taskMetrics); when(shuffleDep.serializer()).thenReturn(serializer); @@ -243,7 +230,7 @@ public class UnsafeShuffleWriterSuite { @Test public void writeEmptyIterator() throws Exception { final UnsafeShuffleWriter writer = createWriter(true); - writer.write(Iterators.>emptyIterator()); + writer.write(Iterators.emptyIterator()); final Option mapStatus = writer.stop(true); assertTrue(mapStatus.isDefined()); assertTrue(mergedOutputFile.exists()); @@ -259,7 +246,7 @@ public class UnsafeShuffleWriterSuite { // In this example, each partition should have exactly one record: final ArrayList> dataToWrite = new ArrayList<>(); for (int i = 0; i < NUM_PARTITITONS; i++) { - dataToWrite.add(new Tuple2(i, i)); + dataToWrite.add(new Tuple2<>(i, i)); } final UnsafeShuffleWriter writer = createWriter(true); writer.write(dataToWrite.iterator()); @@ -315,7 +302,7 @@ public class UnsafeShuffleWriterSuite { final UnsafeShuffleWriter writer = createWriter(transferToEnabled); final ArrayList> dataToWrite = new ArrayList<>(); for (int i : new int[] { 1, 2, 3, 4, 4, 2 }) { - dataToWrite.add(new Tuple2(i, i)); + dataToWrite.add(new Tuple2<>(i, i)); } writer.insertRecordIntoSorter(dataToWrite.get(0)); writer.insertRecordIntoSorter(dataToWrite.get(1)); @@ -424,7 +411,7 @@ public class UnsafeShuffleWriterSuite { final ArrayList> dataToWrite = new ArrayList<>(); final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10]; for (int i = 0; i < 10 + 1; i++) { - dataToWrite.add(new Tuple2(i, bigByteArray)); + dataToWrite.add(new Tuple2<>(i, bigByteArray)); } writer.write(dataToWrite.iterator()); assertEquals(2, spillFilesCreated.size()); @@ -458,7 +445,7 @@ public class UnsafeShuffleWriterSuite { final UnsafeShuffleWriter writer = createWriter(false); final ArrayList> dataToWrite = new ArrayList<>(); for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) { - dataToWrite.add(new Tuple2(i, i)); + dataToWrite.add(new Tuple2<>(i, i)); } writer.write(dataToWrite.iterator()); writer.stop(true); @@ -478,7 +465,7 @@ public class UnsafeShuffleWriterSuite { final ArrayList> dataToWrite = new ArrayList<>(); final byte[] bytes = new byte[(int) (ShuffleExternalSorter.DISK_WRITE_BUFFER_SIZE * 2.5)]; new Random(42).nextBytes(bytes); - dataToWrite.add(new Tuple2(1, ByteBuffer.wrap(bytes))); + dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(bytes))); writer.write(dataToWrite.iterator()); writer.stop(true); assertEquals( @@ -491,15 +478,15 @@ public class UnsafeShuffleWriterSuite { public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception { final UnsafeShuffleWriter writer = createWriter(false); final ArrayList> dataToWrite = new ArrayList<>(); - dataToWrite.add(new Tuple2(1, ByteBuffer.wrap(new byte[1]))); + dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(new byte[1]))); // We should be able to write a record that's right _at_ the max record size final byte[] atMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes() - 4]; new Random(42).nextBytes(atMaxRecordSize); - dataToWrite.add(new Tuple2(2, ByteBuffer.wrap(atMaxRecordSize))); + dataToWrite.add(new Tuple2<>(2, ByteBuffer.wrap(atMaxRecordSize))); // Inserting a record that's larger than the max record size final byte[] exceedsMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes()]; new Random(42).nextBytes(exceedsMaxRecordSize); - dataToWrite.add(new Tuple2(3, ByteBuffer.wrap(exceedsMaxRecordSize))); + dataToWrite.add(new Tuple2<>(3, ByteBuffer.wrap(exceedsMaxRecordSize))); writer.write(dataToWrite.iterator()); writer.stop(true); assertEquals( @@ -511,10 +498,10 @@ public class UnsafeShuffleWriterSuite { @Test public void spillFilesAreDeletedWhenStoppingAfterError() throws IOException { final UnsafeShuffleWriter writer = createWriter(false); - writer.insertRecordIntoSorter(new Tuple2(1, 1)); - writer.insertRecordIntoSorter(new Tuple2(2, 2)); + writer.insertRecordIntoSorter(new Tuple2<>(1, 1)); + writer.insertRecordIntoSorter(new Tuple2<>(2, 2)); writer.forceSorterToSpill(); - writer.insertRecordIntoSorter(new Tuple2(2, 2)); + writer.insertRecordIntoSorter(new Tuple2<>(2, 2)); writer.stop(false); assertSpillFilesWereCleanedUp(); } diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 26568146bf..03cec8ed81 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import scala.Tuple2; import scala.Tuple2$; import org.junit.After; @@ -31,8 +30,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.apache.spark.SparkConf; import org.apache.spark.executor.ShuffleWriteMetrics; @@ -88,25 +85,18 @@ public abstract class AbstractBytesToBytesMapSuite { spillFilesCreated.clear(); MockitoAnnotations.initMocks(this); when(blockManager.diskBlockManager()).thenReturn(diskBlockManager); - when(diskBlockManager.createTempLocalBlock()).thenAnswer( - new Answer>() { - @Override - public Tuple2 answer(InvocationOnMock invocationOnMock) - throws Throwable { - TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID()); - File file = File.createTempFile("spillFile", ".spill", tempDir); - spillFilesCreated.add(file); - return Tuple2$.MODULE$.apply(blockId, file); - } + when(diskBlockManager.createTempLocalBlock()).thenAnswer(invocationOnMock -> { + TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID()); + File file = File.createTempFile("spillFile", ".spill", tempDir); + spillFilesCreated.add(file); + return Tuple2$.MODULE$.apply(blockId, file); }); when(blockManager.getDiskWriter( any(BlockId.class), any(File.class), any(SerializerInstance.class), anyInt(), - any(ShuffleWriteMetrics.class))).thenAnswer(new Answer() { - @Override - public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable { + any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( @@ -118,8 +108,7 @@ public abstract class AbstractBytesToBytesMapSuite { (ShuffleWriteMetrics) args[4], (BlockId) args[0] ); - } - }); + }); } @After diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index fbbe530a13..771d39016c 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.UUID; -import scala.Tuple2; import scala.Tuple2$; import org.junit.After; @@ -31,8 +30,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; @@ -96,25 +93,18 @@ public class UnsafeExternalSorterSuite { taskContext = mock(TaskContext.class); when(taskContext.taskMetrics()).thenReturn(new TaskMetrics()); when(blockManager.diskBlockManager()).thenReturn(diskBlockManager); - when(diskBlockManager.createTempLocalBlock()).thenAnswer( - new Answer>() { - @Override - public Tuple2 answer(InvocationOnMock invocationOnMock) - throws Throwable { - TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID()); - File file = File.createTempFile("spillFile", ".spill", tempDir); - spillFilesCreated.add(file); - return Tuple2$.MODULE$.apply(blockId, file); - } + when(diskBlockManager.createTempLocalBlock()).thenAnswer(invocationOnMock -> { + TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID()); + File file = File.createTempFile("spillFile", ".spill", tempDir); + spillFilesCreated.add(file); + return Tuple2$.MODULE$.apply(blockId, file); }); when(blockManager.getDiskWriter( any(BlockId.class), any(File.class), any(SerializerInstance.class), anyInt(), - any(ShuffleWriteMetrics.class))).thenAnswer(new Answer() { - @Override - public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable { + any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( @@ -126,8 +116,7 @@ public class UnsafeExternalSorterSuite { (ShuffleWriteMetrics) args[4], (BlockId) args[0] ); - } - }); + }); } @After diff --git a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java index e22ad89c1d..1d2b05ebc2 100644 --- a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java @@ -64,12 +64,7 @@ public class Java8RDDAPISuite implements Serializable { public void foreachWithAnonymousClass() { foreachCalls = 0; JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction() { - @Override - public void call(String s) { - foreachCalls++; - } - }); + rdd.foreach(s -> foreachCalls++); Assert.assertEquals(2, foreachCalls); } diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 80aab100ac..512149127d 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -31,7 +31,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.*; import org.apache.spark.Accumulator; @@ -208,7 +207,7 @@ public class JavaAPISuite implements Serializable { assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); // Custom comparator - sortedRDD = rdd.sortByKey(Collections.reverseOrder(), false); + sortedRDD = rdd.sortByKey(Collections.reverseOrder(), false); assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); sortedPairs = sortedRDD.collect(); assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1)); @@ -266,13 +265,7 @@ public class JavaAPISuite implements Serializable { JavaRDD> rdd = sc.parallelize(pairs); // compare on first value - JavaRDD> sortedRDD = - rdd.sortBy(new Function, Integer>() { - @Override - public Integer call(Tuple2 t) { - return t._1(); - } - }, true, 2); + JavaRDD> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2); assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); List> sortedPairs = sortedRDD.collect(); @@ -280,12 +273,7 @@ public class JavaAPISuite implements Serializable { assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); // compare on second value - sortedRDD = rdd.sortBy(new Function, Integer>() { - @Override - public Integer call(Tuple2 t) { - return t._2(); - } - }, true, 2); + sortedRDD = rdd.sortBy(Tuple2::_2, true, 2); assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); sortedPairs = sortedRDD.collect(); assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1)); @@ -294,28 +282,20 @@ public class JavaAPISuite implements Serializable { @Test public void foreach() { - final LongAccumulator accum = sc.sc().longAccumulator(); + LongAccumulator accum = sc.sc().longAccumulator(); JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction() { - @Override - public void call(String s) { - accum.add(1); - } - }); + rdd.foreach(s -> accum.add(1)); assertEquals(2, accum.value().intValue()); } @Test public void foreachPartition() { - final LongAccumulator accum = sc.sc().longAccumulator(); + LongAccumulator accum = sc.sc().longAccumulator(); JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreachPartition(new VoidFunction>() { - @Override - public void call(Iterator iter) { - while (iter.hasNext()) { - iter.next(); - accum.add(1); - } + rdd.foreachPartition(iter -> { + while (iter.hasNext()) { + iter.next(); + accum.add(1); } }); assertEquals(2, accum.value().intValue()); @@ -361,12 +341,7 @@ public class JavaAPISuite implements Serializable { @Test public void groupBy() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function isOdd = new Function() { - @Override - public Boolean call(Integer x) { - return x % 2 == 0; - } - }; + Function isOdd = x -> x % 2 == 0; JavaPairRDD> oddsAndEvens = rdd.groupBy(isOdd); assertEquals(2, oddsAndEvens.count()); assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens @@ -383,12 +358,7 @@ public class JavaAPISuite implements Serializable { // Regression test for SPARK-4459 JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Function, Boolean> areOdd = - new Function, Boolean>() { - @Override - public Boolean call(Tuple2 x) { - return (x._1() % 2 == 0) && (x._2() % 2 == 0); - } - }; + x -> (x._1() % 2 == 0) && (x._2() % 2 == 0); JavaPairRDD pairRDD = rdd.zip(rdd); JavaPairRDD>> oddsAndEvens = pairRDD.groupBy(areOdd); assertEquals(2, oddsAndEvens.count()); @@ -406,13 +376,7 @@ public class JavaAPISuite implements Serializable { public void keyByOnPairRDD() { // Regression test for SPARK-4459 JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function, String> sumToString = - new Function, String>() { - @Override - public String call(Tuple2 x) { - return String.valueOf(x._1() + x._2()); - } - }; + Function, String> sumToString = x -> String.valueOf(x._1() + x._2()); JavaPairRDD pairRDD = rdd.zip(rdd); JavaPairRDD> keyed = pairRDD.keyBy(sumToString); assertEquals(7, keyed.count()); @@ -516,25 +480,14 @@ public class JavaAPISuite implements Serializable { rdd1.leftOuterJoin(rdd2).collect(); assertEquals(5, joined.size()); Tuple2>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter( - new Function>>, Boolean>() { - @Override - public Boolean call(Tuple2>> tup) { - return !tup._2()._2().isPresent(); - } - }).first(); + rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); assertEquals(3, firstUnmatched._1().intValue()); } @Test public void foldReduce() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function2 add = new Function2() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; + Function2 add = (a, b) -> a + b; int sum = rdd.fold(0, add); assertEquals(33, sum); @@ -546,12 +499,7 @@ public class JavaAPISuite implements Serializable { @Test public void treeReduce() { JavaRDD rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10); - Function2 add = new Function2() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; + Function2 add = (a, b) -> a + b; for (int depth = 1; depth <= 10; depth++) { int sum = rdd.treeReduce(add, depth); assertEquals(-5, sum); @@ -561,12 +509,7 @@ public class JavaAPISuite implements Serializable { @Test public void treeAggregate() { JavaRDD rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10); - Function2 add = new Function2() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; + Function2 add = (a, b) -> a + b; for (int depth = 1; depth <= 10; depth++) { int sum = rdd.treeAggregate(0, add, add, depth); assertEquals(-5, sum); @@ -584,21 +527,15 @@ public class JavaAPISuite implements Serializable { new Tuple2<>(5, 1), new Tuple2<>(5, 3)), 2); - Map> sets = pairs.aggregateByKey(new HashSet(), - new Function2, Integer, Set>() { - @Override - public Set call(Set a, Integer b) { - a.add(b); - return a; - } - }, - new Function2, Set, Set>() { - @Override - public Set call(Set a, Set b) { - a.addAll(b); - return a; - } - }).collectAsMap(); + Map> sets = pairs.aggregateByKey(new HashSet(), + (a, b) -> { + a.add(b); + return a; + }, + (a, b) -> { + a.addAll(b); + return a; + }).collectAsMap(); assertEquals(3, sets.size()); assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1)); assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3)); @@ -616,13 +553,7 @@ public class JavaAPISuite implements Serializable { new Tuple2<>(3, 1) ); JavaPairRDD rdd = sc.parallelizePairs(pairs); - JavaPairRDD sums = rdd.foldByKey(0, - new Function2() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); + JavaPairRDD sums = rdd.foldByKey(0, (a, b) -> a + b); assertEquals(1, sums.lookup(1).get(0).intValue()); assertEquals(2, sums.lookup(2).get(0).intValue()); assertEquals(3, sums.lookup(3).get(0).intValue()); @@ -639,13 +570,7 @@ public class JavaAPISuite implements Serializable { new Tuple2<>(3, 1) ); JavaPairRDD rdd = sc.parallelizePairs(pairs); - JavaPairRDD counts = rdd.reduceByKey( - new Function2() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); + JavaPairRDD counts = rdd.reduceByKey((a, b) -> a + b); assertEquals(1, counts.lookup(1).get(0).intValue()); assertEquals(2, counts.lookup(2).get(0).intValue()); assertEquals(3, counts.lookup(3).get(0).intValue()); @@ -655,12 +580,7 @@ public class JavaAPISuite implements Serializable { assertEquals(2, localCounts.get(2).intValue()); assertEquals(3, localCounts.get(3).intValue()); - localCounts = rdd.reduceByKeyLocally(new Function2() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); + localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); assertEquals(1, localCounts.get(1).intValue()); assertEquals(2, localCounts.get(2).intValue()); assertEquals(3, localCounts.get(3).intValue()); @@ -692,20 +612,8 @@ public class JavaAPISuite implements Serializable { assertTrue(sc.emptyRDD().isEmpty()); assertTrue(sc.parallelize(new ArrayList()).isEmpty()); assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty()); - assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter( - new Function() { - @Override - public Boolean call(Integer i) { - return i < 0; - } - }).isEmpty()); - assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter( - new Function() { - @Override - public Boolean call(Integer i) { - return i > 1; - } - }).isEmpty()); + assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(i -> i < 0).isEmpty()); + assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(i -> i > 1).isEmpty()); } @Test @@ -721,12 +629,7 @@ public class JavaAPISuite implements Serializable { JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); JavaDoubleRDD distinct = rdd.distinct(); assertEquals(5, distinct.count()); - JavaDoubleRDD filter = rdd.filter(new Function() { - @Override - public Boolean call(Double x) { - return x > 2.0; - } - }); + JavaDoubleRDD filter = rdd.filter(x -> x > 2.0); assertEquals(3, filter.count()); JavaDoubleRDD union = rdd.union(rdd); assertEquals(12, union.count()); @@ -764,7 +667,7 @@ public class JavaAPISuite implements Serializable { // SPARK-5744 assertArrayEquals( new long[] {0}, - sc.parallelizeDoubles(new ArrayList(0), 1).histogram(new double[]{0.0, 1.0})); + sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0})); } private static class DoubleComparator implements Comparator, Serializable { @@ -833,12 +736,7 @@ public class JavaAPISuite implements Serializable { @Test public void reduceOnJavaDoubleRDD() { JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - double sum = rdd.reduce(new Function2() { - @Override - public Double call(Double v1, Double v2) { - return v1 + v2; - } - }); + double sum = rdd.reduce((v1, v2) -> v1 + v2); assertEquals(10.0, sum, 0.001); } @@ -859,27 +757,11 @@ public class JavaAPISuite implements Serializable { @Test public void map() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction() { - @Override - public double call(Integer x) { - return x.doubleValue(); - } - }).cache(); + JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue).cache(); doubles.collect(); - JavaPairRDD pairs = rdd.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(Integer x) { - return new Tuple2<>(x, x); - } - }).cache(); + JavaPairRDD pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)).cache(); pairs.collect(); - JavaRDD strings = rdd.map(new Function() { - @Override - public String call(Integer x) { - return x.toString(); - } - }).cache(); + JavaRDD strings = rdd.map(Object::toString).cache(); strings.collect(); } @@ -887,39 +769,27 @@ public class JavaAPISuite implements Serializable { public void flatMap() { JavaRDD rdd = sc.parallelize(Arrays.asList("Hello World!", "The quick brown fox jumps over the lazy dog.")); - JavaRDD words = rdd.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }); + JavaRDD words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); assertEquals("Hello", words.first()); assertEquals(11, words.count()); - JavaPairRDD pairsRDD = rdd.flatMapToPair( - new PairFlatMapFunction() { - @Override - public Iterator> call(String s) { - List> pairs = new LinkedList<>(); - for (String word : s.split(" ")) { - pairs.add(new Tuple2<>(word, word)); - } - return pairs.iterator(); + JavaPairRDD pairsRDD = rdd.flatMapToPair(s -> { + List> pairs = new LinkedList<>(); + for (String word : s.split(" ")) { + pairs.add(new Tuple2<>(word, word)); } + return pairs.iterator(); } ); assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first()); assertEquals(11, pairsRDD.count()); - JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction() { - @Override - public Iterator call(String s) { - List lengths = new LinkedList<>(); - for (String word : s.split(" ")) { - lengths.add((double) word.length()); - } - return lengths.iterator(); + JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { + List lengths = new LinkedList<>(); + for (String word : s.split(" ")) { + lengths.add((double) word.length()); } + return lengths.iterator(); }); assertEquals(5.0, doubles.first(), 0.01); assertEquals(11, pairsRDD.count()); @@ -937,37 +807,23 @@ public class JavaAPISuite implements Serializable { // Regression test for SPARK-668: JavaPairRDD swapped = pairRDD.flatMapToPair( - new PairFlatMapFunction, String, Integer>() { - @Override - public Iterator> call(Tuple2 item) { - return Collections.singletonList(item.swap()).iterator(); - } - }); + item -> Collections.singletonList(item.swap()).iterator()); swapped.collect(); // There was never a bug here, but it's worth testing: - pairRDD.mapToPair(new PairFunction, String, Integer>() { - @Override - public Tuple2 call(Tuple2 item) { - return item.swap(); - } - }).collect(); + pairRDD.mapToPair(Tuple2::swap).collect(); } @Test public void mapPartitions() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD partitionSums = rdd.mapPartitions( - new FlatMapFunction, Integer>() { - @Override - public Iterator call(Iterator iter) { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum).iterator(); + JavaRDD partitionSums = rdd.mapPartitions(iter -> { + int sum = 0; + while (iter.hasNext()) { + sum += iter.next(); } - }); + return Collections.singletonList(sum).iterator(); + }); assertEquals("[3, 7]", partitionSums.collect().toString()); } @@ -975,17 +831,13 @@ public class JavaAPISuite implements Serializable { @Test public void mapPartitionsWithIndex() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD partitionSums = rdd.mapPartitionsWithIndex( - new Function2, Iterator>() { - @Override - public Iterator call(Integer index, Iterator iter) { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum).iterator(); + JavaRDD partitionSums = rdd.mapPartitionsWithIndex((index, iter) -> { + int sum = 0; + while (iter.hasNext()) { + sum += iter.next(); } - }, false); + return Collections.singletonList(sum).iterator(); + }, false); assertEquals("[3, 7]", partitionSums.collect().toString()); } @@ -1124,21 +976,12 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction, IntWritable, Text>() { - @Override - public Tuple2 call(Tuple2 pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); // Try reading the output back as an object file JavaPairRDD readRDD = sc.sequenceFile(outputDir, IntWritable.class, - Text.class).mapToPair(new PairFunction, Integer, String>() { - @Override - public Tuple2 call(Tuple2 pair) { - return new Tuple2<>(pair._1().get(), pair._2().toString()); - } - }); + Text.class).mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); assertEquals(pairs, readRDD.collect()); } @@ -1179,12 +1022,7 @@ public class JavaAPISuite implements Serializable { channel1.close(); JavaPairRDD readRDD = sc.binaryFiles(tempDirName).cache(); - readRDD.foreach(new VoidFunction>() { - @Override - public void call(Tuple2 pair) { - pair._2().toArray(); // force the file to read - } - }); + readRDD.foreach(pair -> pair._2().toArray()); // force the file to read List> result = readRDD.collect(); for (Tuple2 res : result) { @@ -1229,23 +1067,13 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction, IntWritable, Text>() { - @Override - public Tuple2 call(Tuple2 pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsNewAPIHadoopFile( - outputDir, IntWritable.class, Text.class, + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class, org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); JavaPairRDD output = sc.sequenceFile(outputDir, IntWritable.class, Text.class); - assertEquals(pairs.toString(), output.map(new Function, String>() { - @Override - public String call(Tuple2 x) { - return x.toString(); - } - }).collect().toString()); + assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @SuppressWarnings("unchecked") @@ -1259,22 +1087,13 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction, IntWritable, Text>() { - @Override - public Tuple2 call(Tuple2 pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); JavaPairRDD output = sc.newAPIHadoopFile(outputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, Job.getInstance().getConfiguration()); - assertEquals(pairs.toString(), output.map(new Function, String>() { - @Override - public String call(Tuple2 x) { - return x.toString(); - } - }).collect().toString()); + assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @Test @@ -1315,21 +1134,12 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction, IntWritable, Text>() { - @Override - public Tuple2 call(Tuple2 pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); JavaPairRDD output = sc.hadoopFile(outputDir, SequenceFileInputFormat.class, IntWritable.class, Text.class); - assertEquals(pairs.toString(), output.map(new Function, String>() { - @Override - public String call(Tuple2 x) { - return x.toString(); - } - }).collect().toString()); + assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @SuppressWarnings("unchecked") @@ -1343,34 +1153,19 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction, IntWritable, Text>() { - @Override - public Tuple2 call(Tuple2 pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, - DefaultCodec.class); + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, DefaultCodec.class); JavaPairRDD output = sc.hadoopFile(outputDir, SequenceFileInputFormat.class, IntWritable.class, Text.class); - assertEquals(pairs.toString(), output.map(new Function, String>() { - @Override - public String call(Tuple2 x) { - return x.toString(); - } - }).collect().toString()); + assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @Test public void zip() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction() { - @Override - public double call(Integer x) { - return x.doubleValue(); - } - }); + JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue); JavaPairRDD zipped = rdd.zip(doubles); zipped.count(); } @@ -1380,12 +1175,7 @@ public class JavaAPISuite implements Serializable { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); JavaRDD rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); FlatMapFunction2, Iterator, Integer> sizesFn = - new FlatMapFunction2, Iterator, Integer>() { - @Override - public Iterator call(Iterator i, Iterator s) { - return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator(); - } - }; + (i, s) -> Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator(); JavaRDD sizes = rdd1.zipPartitions(rdd2, sizesFn); assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); @@ -1396,22 +1186,12 @@ public class JavaAPISuite implements Serializable { public void accumulators() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - final Accumulator intAccum = sc.intAccumulator(10); - rdd.foreach(new VoidFunction() { - @Override - public void call(Integer x) { - intAccum.add(x); - } - }); + Accumulator intAccum = sc.intAccumulator(10); + rdd.foreach(intAccum::add); assertEquals((Integer) 25, intAccum.value()); - final Accumulator doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(new VoidFunction() { - @Override - public void call(Integer x) { - doubleAccum.add((double) x); - } - }); + Accumulator doubleAccum = sc.doubleAccumulator(10.0); + rdd.foreach(x -> doubleAccum.add((double) x)); assertEquals((Double) 25.0, doubleAccum.value()); // Try a custom accumulator type @@ -1432,13 +1212,8 @@ public class JavaAPISuite implements Serializable { } }; - final Accumulator floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(new VoidFunction() { - @Override - public void call(Integer x) { - floatAccum.add((float) x); - } - }); + Accumulator floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); + rdd.foreach(x -> floatAccum.add((float) x)); assertEquals((Float) 25.0f, floatAccum.value()); // Test the setValue method @@ -1449,12 +1224,7 @@ public class JavaAPISuite implements Serializable { @Test public void keyBy() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2)); - List> s = rdd.keyBy(new Function() { - @Override - public String call(Integer t) { - return t.toString(); - } - }).collect(); + List> s = rdd.keyBy(Object::toString).collect(); assertEquals(new Tuple2<>("1", 1), s.get(0)); assertEquals(new Tuple2<>("2", 2), s.get(1)); } @@ -1487,26 +1257,10 @@ public class JavaAPISuite implements Serializable { @Test public void combineByKey() { JavaRDD originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); - Function keyFunction = new Function() { - @Override - public Integer call(Integer v1) { - return v1 % 3; - } - }; - Function createCombinerFunction = new Function() { - @Override - public Integer call(Integer v1) { - return v1; - } - }; + Function keyFunction = v1 -> v1 % 3; + Function createCombinerFunction = v1 -> v1; - Function2 mergeValueFunction = - new Function2() { - @Override - public Integer call(Integer v1, Integer v2) { - return v1 + v2; - } - }; + Function2 mergeValueFunction = (v1, v2) -> v1 + v2; JavaPairRDD combinedRDD = originalRDD.keyBy(keyFunction) .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction); @@ -1534,20 +1288,8 @@ public class JavaAPISuite implements Serializable { @Test public void mapOnPairRDD() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); - JavaPairRDD rdd2 = rdd1.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(Integer i) { - return new Tuple2<>(i, i % 2); - } - }); - JavaPairRDD rdd3 = rdd2.mapToPair( - new PairFunction, Integer, Integer>() { - @Override - public Tuple2 call(Tuple2 in) { - return new Tuple2<>(in._2(), in._1()); - } - }); + JavaPairRDD rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); + JavaPairRDD rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); assertEquals(Arrays.asList( new Tuple2<>(1, 1), new Tuple2<>(0, 2), @@ -1561,13 +1303,7 @@ public class JavaAPISuite implements Serializable { public void collectPartitions() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); - JavaPairRDD rdd2 = rdd1.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(Integer i) { - return new Tuple2<>(i, i % 2); - } - }); + JavaPairRDD rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); List[] parts = rdd1.collectPartitions(new int[] {0}); assertEquals(Arrays.asList(1, 2), parts[0]); @@ -1623,13 +1359,7 @@ public class JavaAPISuite implements Serializable { public void collectAsMapWithIntArrayValues() { // Regression test for SPARK-1040 JavaRDD rdd = sc.parallelize(Arrays.asList(1)); - JavaPairRDD pairRDD = rdd.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(Integer x) { - return new Tuple2<>(x, new int[]{x}); - } - }); + JavaPairRDD pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); pairRDD.collect(); // Works fine pairRDD.collectAsMap(); // Used to crash with ClassCastException } @@ -1651,13 +1381,7 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") public void sampleByKey() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3); - JavaPairRDD rdd2 = rdd1.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(Integer i) { - return new Tuple2<>(i % 2, 1); - } - }); + JavaPairRDD rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1)); Map fractions = new HashMap<>(); fractions.put(0, 0.5); fractions.put(1, 1.0); @@ -1677,13 +1401,7 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") public void sampleByKeyExact() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3); - JavaPairRDD rdd2 = rdd1.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(Integer i) { - return new Tuple2<>(i % 2, 1); - } - }); + JavaPairRDD rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1)); Map fractions = new HashMap<>(); fractions.put(0, 0.5); fractions.put(1, 1.0); @@ -1754,14 +1472,7 @@ public class JavaAPISuite implements Serializable { public void foreachAsync() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); - JavaFutureAction future = rdd.foreachAsync( - new VoidFunction() { - @Override - public void call(Integer integer) { - // intentionally left blank. - } - } - ); + JavaFutureAction future = rdd.foreachAsync(integer -> {}); future.get(); assertFalse(future.isCancelled()); assertTrue(future.isDone()); @@ -1784,11 +1495,8 @@ public class JavaAPISuite implements Serializable { public void testAsyncActionCancellation() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); - JavaFutureAction future = rdd.foreachAsync(new VoidFunction() { - @Override - public void call(Integer integer) throws InterruptedException { - Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled. - } + JavaFutureAction future = rdd.foreachAsync(integer -> { + Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled. }); future.cancel(true); assertTrue(future.isCancelled()); @@ -1805,7 +1513,7 @@ public class JavaAPISuite implements Serializable { public void testAsyncActionErrorWrapping() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); - JavaFutureAction future = rdd.map(new BuggyMapFunction()).countAsync(); + JavaFutureAction future = rdd.map(new BuggyMapFunction<>()).countAsync(); try { future.get(2, TimeUnit.SECONDS); fail("Expected future.get() for failed job to throw ExcecutionException"); -- cgit v1.2.3