aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java8
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java129
2 files changed, 102 insertions, 35 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java
index db9e827590..934b7e0305 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java
@@ -32,8 +32,8 @@ public class PackedRecordPointerSuite {
public void heap() {
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
- final MemoryBlock page0 = memoryManager.allocatePage(100);
- final MemoryBlock page1 = memoryManager.allocatePage(100);
+ final MemoryBlock page0 = memoryManager.allocatePage(128);
+ final MemoryBlock page1 = memoryManager.allocatePage(128);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
@@ -50,8 +50,8 @@ public class PackedRecordPointerSuite {
public void offHeap() {
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE));
- final MemoryBlock page0 = memoryManager.allocatePage(100);
- final MemoryBlock page1 = memoryManager.allocatePage(100);
+ final MemoryBlock page0 = memoryManager.allocatePage(128);
+ final MemoryBlock page1 = memoryManager.allocatePage(128);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index c11949d57a..968185bde7 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -18,8 +18,10 @@
package org.apache.spark.util.collection.unsafe.sort;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.UUID;
@@ -34,6 +36,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
import static org.mockito.AdditionalAnswers.returnsSecondArg;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
@@ -77,12 +80,13 @@ public class UnsafeExternalSorterSuite {
}
};
+ SparkConf sparkConf;
+ File tempDir;
ShuffleMemoryManager shuffleMemoryManager;
@Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager;
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;
@Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext;
- File tempDir;
private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "64m");
@@ -96,6 +100,7 @@ public class UnsafeExternalSorterSuite {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ sparkConf = new SparkConf();
tempDir = new File(Utils.createTempDir$default$1());
shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
spillFilesCreated.clear();
@@ -155,14 +160,19 @@ public class UnsafeExternalSorterSuite {
}
private static void insertNumber(UnsafeExternalSorter sorter, int value) throws Exception {
- final int[] arr = new int[] { value };
+ final int[] arr = new int[]{ value };
sorter.insertRecord(arr, PlatformDependent.INT_ARRAY_OFFSET, 4, value);
}
- @Test
- public void testSortingOnlyByPrefix() throws Exception {
+ private static void insertRecord(
+ UnsafeExternalSorter sorter,
+ int[] record,
+ long prefix) throws IOException {
+ sorter.insertRecord(record, PlatformDependent.INT_ARRAY_OFFSET, record.length * 4, prefix);
+ }
- final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
+ private UnsafeExternalSorter newSorter() throws IOException {
+ return UnsafeExternalSorter.create(
taskMemoryManager,
shuffleMemoryManager,
blockManager,
@@ -171,7 +181,11 @@ public class UnsafeExternalSorterSuite {
prefixComparator,
/* initialSize */ 1024,
pageSizeBytes);
+ }
+ @Test
+ public void testSortingOnlyByPrefix() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
insertNumber(sorter, 5);
insertNumber(sorter, 1);
insertNumber(sorter, 3);
@@ -186,26 +200,16 @@ public class UnsafeExternalSorterSuite {
iter.loadNext();
assertEquals(i, iter.getKeyPrefix());
assertEquals(4, iter.getRecordLength());
- // TODO: read rest of value.
+ assertEquals(i, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
}
- sorter.freeMemory();
+ sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
}
@Test
public void testSortingEmptyArrays() throws Exception {
-
- final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
- taskMemoryManager,
- shuffleMemoryManager,
- blockManager,
- taskContext,
- recordComparator,
- prefixComparator,
- /* initialSize */ 1024,
- pageSizeBytes);
-
+ final UnsafeExternalSorter sorter = newSorter();
sorter.insertRecord(null, 0, 0, 0);
sorter.insertRecord(null, 0, 0, 0);
sorter.spill();
@@ -222,28 +226,89 @@ public class UnsafeExternalSorterSuite {
assertEquals(0, iter.getRecordLength());
}
- sorter.freeMemory();
+ sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
}
@Test
- public void testFillingPage() throws Exception {
+ public void spillingOccursInResponseToMemoryPressure() throws Exception {
+ shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2);
+ final UnsafeExternalSorter sorter = newSorter();
+ final int numRecords = 100000;
+ for (int i = 0; i <= numRecords; i++) {
+ insertNumber(sorter, numRecords - i);
+ }
+ // Ensure that spill files were created
+ assertThat(tempDir.listFiles().length, greaterThanOrEqualTo(1));
+ // Read back the sorted data:
+ UnsafeSorterIterator iter = sorter.getSortedIterator();
- final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
- taskMemoryManager,
- shuffleMemoryManager,
- blockManager,
- taskContext,
- recordComparator,
- prefixComparator,
- /* initialSize */ 1024,
- pageSizeBytes);
+ int i = 0;
+ while (iter.hasNext()) {
+ iter.loadNext();
+ assertEquals(i, iter.getKeyPrefix());
+ assertEquals(4, iter.getRecordLength());
+ assertEquals(i, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+ i++;
+ }
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
+ }
+ @Test
+ public void testFillingPage() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
byte[] record = new byte[16];
while (sorter.getNumberOfAllocatedPages() < 2) {
sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0);
}
- sorter.freeMemory();
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
+ }
+
+ @Test
+ public void sortingRecordsThatExceedPageSize() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ final int[] largeRecord = new int[(int) pageSizeBytes + 16];
+ Arrays.fill(largeRecord, 456);
+ final int[] smallRecord = new int[100];
+ Arrays.fill(smallRecord, 123);
+
+ insertRecord(sorter, largeRecord, 456);
+ sorter.spill();
+ insertRecord(sorter, smallRecord, 123);
+ sorter.spill();
+ insertRecord(sorter, smallRecord, 123);
+ insertRecord(sorter, largeRecord, 456);
+
+ UnsafeSorterIterator iter = sorter.getSortedIterator();
+ // Small record
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ assertEquals(123, iter.getKeyPrefix());
+ assertEquals(smallRecord.length * 4, iter.getRecordLength());
+ assertEquals(123, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+ // Small record
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ assertEquals(123, iter.getKeyPrefix());
+ assertEquals(smallRecord.length * 4, iter.getRecordLength());
+ assertEquals(123, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+ // Large record
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ assertEquals(456, iter.getKeyPrefix());
+ assertEquals(largeRecord.length * 4, iter.getRecordLength());
+ assertEquals(456, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+ // Large record
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ assertEquals(456, iter.getKeyPrefix());
+ assertEquals(largeRecord.length * 4, iter.getRecordLength());
+ assertEquals(456, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+
+ assertFalse(iter.hasNext());
+ sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
}
@@ -289,8 +354,10 @@ public class UnsafeExternalSorterSuite {
newPeakMemory = sorter.getPeakMemoryUsedBytes();
assertEquals(previousPeakMemory, newPeakMemory);
} finally {
- sorter.freeMemory();
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
}
}
}
+