diff options
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java | 8 | ||||
-rw-r--r-- | core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java | 129 |
2 files changed, 102 insertions, 35 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java index db9e827590..934b7e0305 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java @@ -32,8 +32,8 @@ public class PackedRecordPointerSuite { public void heap() { final TaskMemoryManager memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)); - final MemoryBlock page0 = memoryManager.allocatePage(100); - final MemoryBlock page1 = memoryManager.allocatePage(100); + final MemoryBlock page0 = memoryManager.allocatePage(128); + final MemoryBlock page1 = memoryManager.allocatePage(128); final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, page1.getBaseOffset() + 42); PackedRecordPointer packedPointer = new PackedRecordPointer(); @@ -50,8 +50,8 @@ public class PackedRecordPointerSuite { public void offHeap() { final TaskMemoryManager memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE)); - final MemoryBlock page0 = memoryManager.allocatePage(100); - final MemoryBlock page1 = memoryManager.allocatePage(100); + final MemoryBlock page0 = memoryManager.allocatePage(128); + final MemoryBlock page1 = memoryManager.allocatePage(128); final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, page1.getBaseOffset() + 42); PackedRecordPointer packedPointer = new PackedRecordPointer(); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index c11949d57a..968185bde7 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -18,8 +18,10 @@ package org.apache.spark.util.collection.unsafe.sort; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; import java.util.LinkedList; import java.util.UUID; @@ -34,6 +36,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.*; import static org.mockito.AdditionalAnswers.returnsSecondArg; import static org.mockito.Answers.RETURNS_SMART_NULLS; @@ -77,12 +80,13 @@ public class UnsafeExternalSorterSuite { } }; + SparkConf sparkConf; + File tempDir; ShuffleMemoryManager shuffleMemoryManager; @Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager; @Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager; @Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext; - File tempDir; private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "64m"); @@ -96,6 +100,7 @@ public class UnsafeExternalSorterSuite { @Before public void setUp() { MockitoAnnotations.initMocks(this); + sparkConf = new SparkConf(); tempDir = new File(Utils.createTempDir$default$1()); shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE); spillFilesCreated.clear(); @@ -155,14 +160,19 @@ public class UnsafeExternalSorterSuite { } private static void insertNumber(UnsafeExternalSorter sorter, int value) throws Exception { - final int[] arr = new int[] { value }; + final int[] arr = new int[]{ value }; sorter.insertRecord(arr, PlatformDependent.INT_ARRAY_OFFSET, 4, value); } - @Test - public void testSortingOnlyByPrefix() throws Exception { + private static void insertRecord( + UnsafeExternalSorter sorter, + int[] record, + long prefix) throws IOException { + sorter.insertRecord(record, PlatformDependent.INT_ARRAY_OFFSET, record.length * 4, prefix); + } - final UnsafeExternalSorter sorter = UnsafeExternalSorter.create( + private UnsafeExternalSorter newSorter() throws IOException { + return UnsafeExternalSorter.create( taskMemoryManager, shuffleMemoryManager, blockManager, @@ -171,7 +181,11 @@ public class UnsafeExternalSorterSuite { prefixComparator, /* initialSize */ 1024, pageSizeBytes); + } + @Test + public void testSortingOnlyByPrefix() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); insertNumber(sorter, 5); insertNumber(sorter, 1); insertNumber(sorter, 3); @@ -186,26 +200,16 @@ public class UnsafeExternalSorterSuite { iter.loadNext(); assertEquals(i, iter.getKeyPrefix()); assertEquals(4, iter.getRecordLength()); - // TODO: read rest of value. + assertEquals(i, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); } - sorter.freeMemory(); + sorter.cleanupResources(); assertSpillFilesWereCleanedUp(); } @Test public void testSortingEmptyArrays() throws Exception { - - final UnsafeExternalSorter sorter = UnsafeExternalSorter.create( - taskMemoryManager, - shuffleMemoryManager, - blockManager, - taskContext, - recordComparator, - prefixComparator, - /* initialSize */ 1024, - pageSizeBytes); - + final UnsafeExternalSorter sorter = newSorter(); sorter.insertRecord(null, 0, 0, 0); sorter.insertRecord(null, 0, 0, 0); sorter.spill(); @@ -222,28 +226,89 @@ public class UnsafeExternalSorterSuite { assertEquals(0, iter.getRecordLength()); } - sorter.freeMemory(); + sorter.cleanupResources(); assertSpillFilesWereCleanedUp(); } @Test - public void testFillingPage() throws Exception { + public void spillingOccursInResponseToMemoryPressure() throws Exception { + shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2); + final UnsafeExternalSorter sorter = newSorter(); + final int numRecords = 100000; + for (int i = 0; i <= numRecords; i++) { + insertNumber(sorter, numRecords - i); + } + // Ensure that spill files were created + assertThat(tempDir.listFiles().length, greaterThanOrEqualTo(1)); + // Read back the sorted data: + UnsafeSorterIterator iter = sorter.getSortedIterator(); - final UnsafeExternalSorter sorter = UnsafeExternalSorter.create( - taskMemoryManager, - shuffleMemoryManager, - blockManager, - taskContext, - recordComparator, - prefixComparator, - /* initialSize */ 1024, - pageSizeBytes); + int i = 0; + while (iter.hasNext()) { + iter.loadNext(); + assertEquals(i, iter.getKeyPrefix()); + assertEquals(4, iter.getRecordLength()); + assertEquals(i, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + i++; + } + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); + } + @Test + public void testFillingPage() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); byte[] record = new byte[16]; while (sorter.getNumberOfAllocatedPages() < 2) { sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0); } - sorter.freeMemory(); + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); + } + + @Test + public void sortingRecordsThatExceedPageSize() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + final int[] largeRecord = new int[(int) pageSizeBytes + 16]; + Arrays.fill(largeRecord, 456); + final int[] smallRecord = new int[100]; + Arrays.fill(smallRecord, 123); + + insertRecord(sorter, largeRecord, 456); + sorter.spill(); + insertRecord(sorter, smallRecord, 123); + sorter.spill(); + insertRecord(sorter, smallRecord, 123); + insertRecord(sorter, largeRecord, 456); + + UnsafeSorterIterator iter = sorter.getSortedIterator(); + // Small record + assertTrue(iter.hasNext()); + iter.loadNext(); + assertEquals(123, iter.getKeyPrefix()); + assertEquals(smallRecord.length * 4, iter.getRecordLength()); + assertEquals(123, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + // Small record + assertTrue(iter.hasNext()); + iter.loadNext(); + assertEquals(123, iter.getKeyPrefix()); + assertEquals(smallRecord.length * 4, iter.getRecordLength()); + assertEquals(123, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + // Large record + assertTrue(iter.hasNext()); + iter.loadNext(); + assertEquals(456, iter.getKeyPrefix()); + assertEquals(largeRecord.length * 4, iter.getRecordLength()); + assertEquals(456, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + // Large record + assertTrue(iter.hasNext()); + iter.loadNext(); + assertEquals(456, iter.getKeyPrefix()); + assertEquals(largeRecord.length * 4, iter.getRecordLength()); + assertEquals(456, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + + assertFalse(iter.hasNext()); + sorter.cleanupResources(); assertSpillFilesWereCleanedUp(); } @@ -289,8 +354,10 @@ public class UnsafeExternalSorterSuite { newPeakMemory = sorter.getPeakMemoryUsedBytes(); assertEquals(previousPeakMemory, newPeakMemory); } finally { - sorter.freeMemory(); + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); } } } + |