aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java')
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java65
1 files changed, 26 insertions, 39 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 088b68132d..24a55df84a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -34,8 +34,6 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.ShuffleDependency;
@@ -119,9 +117,7 @@ public class UnsafeShuffleWriterSuite {
any(File.class),
any(SerializerInstance.class),
anyInt(),
- any(ShuffleWriteMetrics.class))).thenAnswer(new Answer<DiskBlockObjectWriter>() {
- @Override
- public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable {
+ any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
return new DiskBlockObjectWriter(
(File) args[1],
@@ -132,33 +128,24 @@ public class UnsafeShuffleWriterSuite {
(ShuffleWriteMetrics) args[4],
(BlockId) args[0]
);
- }
- });
+ });
when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
- File tmp = (File) invocationOnMock.getArguments()[3];
- mergedOutputFile.delete();
- tmp.renameTo(mergedOutputFile);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
+ File tmp = (File) invocationOnMock.getArguments()[3];
+ mergedOutputFile.delete();
+ tmp.renameTo(mergedOutputFile);
+ return null;
}).when(shuffleBlockResolver)
.writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), any(File.class));
- when(diskBlockManager.createTempShuffleBlock()).thenAnswer(
- new Answer<Tuple2<TempShuffleBlockId, File>>() {
- @Override
- public Tuple2<TempShuffleBlockId, File> answer(
- InvocationOnMock invocationOnMock) throws Throwable {
- TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
- File file = File.createTempFile("spillFile", ".spill", tempDir);
- spillFilesCreated.add(file);
- return Tuple2$.MODULE$.apply(blockId, file);
- }
- });
+ when(diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock -> {
+ TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
+ File file = File.createTempFile("spillFile", ".spill", tempDir);
+ spillFilesCreated.add(file);
+ return Tuple2$.MODULE$.apply(blockId, file);
+ });
when(taskContext.taskMetrics()).thenReturn(taskMetrics);
when(shuffleDep.serializer()).thenReturn(serializer);
@@ -243,7 +230,7 @@ public class UnsafeShuffleWriterSuite {
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
- writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
+ writer.write(Iterators.emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
@@ -259,7 +246,7 @@ public class UnsafeShuffleWriterSuite {
// In this example, each partition should have exactly one record:
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < NUM_PARTITITONS; i++) {
- dataToWrite.add(new Tuple2<Object, Object>(i, i));
+ dataToWrite.add(new Tuple2<>(i, i));
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(dataToWrite.iterator());
@@ -315,7 +302,7 @@ public class UnsafeShuffleWriterSuite {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(transferToEnabled);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i : new int[] { 1, 2, 3, 4, 4, 2 }) {
- dataToWrite.add(new Tuple2<Object, Object>(i, i));
+ dataToWrite.add(new Tuple2<>(i, i));
}
writer.insertRecordIntoSorter(dataToWrite.get(0));
writer.insertRecordIntoSorter(dataToWrite.get(1));
@@ -424,7 +411,7 @@ public class UnsafeShuffleWriterSuite {
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
for (int i = 0; i < 10 + 1; i++) {
- dataToWrite.add(new Tuple2<Object, Object>(i, bigByteArray));
+ dataToWrite.add(new Tuple2<>(i, bigByteArray));
}
writer.write(dataToWrite.iterator());
assertEquals(2, spillFilesCreated.size());
@@ -458,7 +445,7 @@ public class UnsafeShuffleWriterSuite {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) {
- dataToWrite.add(new Tuple2<Object, Object>(i, i));
+ dataToWrite.add(new Tuple2<>(i, i));
}
writer.write(dataToWrite.iterator());
writer.stop(true);
@@ -478,7 +465,7 @@ public class UnsafeShuffleWriterSuite {
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
final byte[] bytes = new byte[(int) (ShuffleExternalSorter.DISK_WRITE_BUFFER_SIZE * 2.5)];
new Random(42).nextBytes(bytes);
- dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(bytes)));
+ dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(bytes)));
writer.write(dataToWrite.iterator());
writer.stop(true);
assertEquals(
@@ -491,15 +478,15 @@ public class UnsafeShuffleWriterSuite {
public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
- dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(new byte[1])));
+ dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(new byte[1])));
// We should be able to write a record that's right _at_ the max record size
final byte[] atMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes() - 4];
new Random(42).nextBytes(atMaxRecordSize);
- dataToWrite.add(new Tuple2<Object, Object>(2, ByteBuffer.wrap(atMaxRecordSize)));
+ dataToWrite.add(new Tuple2<>(2, ByteBuffer.wrap(atMaxRecordSize)));
// Inserting a record that's larger than the max record size
final byte[] exceedsMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes()];
new Random(42).nextBytes(exceedsMaxRecordSize);
- dataToWrite.add(new Tuple2<Object, Object>(3, ByteBuffer.wrap(exceedsMaxRecordSize)));
+ dataToWrite.add(new Tuple2<>(3, ByteBuffer.wrap(exceedsMaxRecordSize)));
writer.write(dataToWrite.iterator());
writer.stop(true);
assertEquals(
@@ -511,10 +498,10 @@ public class UnsafeShuffleWriterSuite {
@Test
public void spillFilesAreDeletedWhenStoppingAfterError() throws IOException {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
- writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
- writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
+ writer.insertRecordIntoSorter(new Tuple2<>(1, 1));
+ writer.insertRecordIntoSorter(new Tuple2<>(2, 2));
writer.forceSorterToSpill();
- writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
+ writer.insertRecordIntoSorter(new Tuple2<>(2, 2));
writer.stop(false);
assertSpillFilesWereCleanedUp();
}