diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-08-04 14:42:11 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-04 14:42:11 -0700 |
commit | ab8ee1a3b93286a62949569615086ef5030e9fae (patch) | |
tree | 88aa364451320f2a303bbeeb4857bcba57896c84 /core | |
parent | f4b1ac08a1327e6d0ddc317cdf3997a0f68dec72 (diff) | |
download | spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.gz spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.bz2 spark-ab8ee1a3b93286a62949569615086ef5030e9fae.zip |
[SPARK-9452] [SQL] Support records larger than page size in UnsafeExternalSorter
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits:
967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
948c344 [Josh Rosen] Add large records tests for KV sorter.
3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method
380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite
a49baef [Josh Rosen] Address initial round of review comments
3edb931 [Josh Rosen] Remove accidentally-committed debug statements.
2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Diffstat (limited to 'core')
3 files changed, 223 insertions, 87 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index dec7fcfa0d..e6ddd08e5f 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -34,6 +34,7 @@ import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.shuffle.ShuffleMemoryManager; import org.apache.spark.storage.BlockManager; +import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.PlatformDependent; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.memory.TaskMemoryManager; @@ -143,8 +144,7 @@ public final class UnsafeExternalSorter { taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() { @Override public BoxedUnit apply() { - deleteSpillFiles(); - freeMemory(); + cleanupResources(); return null; } }); @@ -249,7 +249,7 @@ public final class UnsafeExternalSorter { * * @return the number of bytes freed. */ - public long freeMemory() { + private long freeMemory() { updatePeakMemoryUsed(); long memoryFreed = 0; for (MemoryBlock block : allocatedPages) { @@ -275,44 +275,32 @@ public final class UnsafeExternalSorter { /** * Deletes any spill files created by this sorter. */ - public void deleteSpillFiles() { + private void deleteSpillFiles() { for (UnsafeSorterSpillWriter spill : spillWriters) { File file = spill.getFile(); if (file != null && file.exists()) { if (!file.delete()) { logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); - }; + } } } } /** - * Checks whether there is enough space to insert a new record into the sorter. - * - * @param requiredSpace the required space in the data page, in bytes, including space for storing - * the record size. - - * @return true if the record can be inserted without requiring more allocations, false otherwise. + * Frees this sorter's in-memory data structures and cleans up its spill files. */ - private boolean haveSpaceForRecord(int requiredSpace) { - assert(requiredSpace > 0); - assert(inMemSorter != null); - return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage)); + public void cleanupResources() { + deleteSpillFiles(); + freeMemory(); } /** - * Allocates more memory in order to insert an additional record. This will request additional - * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be - * obtained. - * - * @param requiredSpace the required space in the data page, in bytes, including space for storing - * the record size. + * Checks whether there is enough space to insert an additional record in to the sort pointer + * array and grows the array if additional space is required. If the required space cannot be + * obtained, then the in-memory data will be spilled to disk. */ - private void allocateSpaceForRecord(int requiredSpace) throws IOException { + private void growPointerArrayIfNecessary() throws IOException { assert(inMemSorter != null); - // TODO: merge these steps to first calculate total memory requirements for this insert, - // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the - // data page. if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.debug("Attempting to expand sort pointer array"); final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage(); @@ -326,7 +314,20 @@ public final class UnsafeExternalSorter { shuffleMemoryManager.release(oldPointerArrayMemoryUsage); } } + } + /** + * Allocates more memory in order to insert an additional record. This will request additional + * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be + * obtained. + * + * @param requiredSpace the required space in the data page, in bytes, including space for storing + * the record size. This must be less than or equal to the page size (records + * that exceed the page size are handled via a different code path which uses + * special overflow pages). + */ + private void acquireNewPageIfNecessary(int requiredSpace) throws IOException { + assert (requiredSpace <= pageSizeBytes); if (requiredSpace > freeSpaceInCurrentPage) { logger.trace("Required space {} is less than free space in current page ({})", requiredSpace, freeSpaceInCurrentPage); @@ -339,9 +340,7 @@ public final class UnsafeExternalSorter { } else { final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes); if (memoryAcquired < pageSizeBytes) { - if (memoryAcquired > 0) { - shuffleMemoryManager.release(memoryAcquired); - } + shuffleMemoryManager.release(memoryAcquired); spill(); final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes); if (memoryAcquiredAfterSpilling != pageSizeBytes) { @@ -365,26 +364,59 @@ public final class UnsafeExternalSorter { long recordBaseOffset, int lengthInBytes, long prefix) throws IOException { + + growPointerArrayIfNecessary(); // Need 4 bytes to store the record length. final int totalSpaceRequired = lengthInBytes + 4; - if (!haveSpaceForRecord(totalSpaceRequired)) { - allocateSpaceForRecord(totalSpaceRequired); + + // --- Figure out where to insert the new record ---------------------------------------------- + + final MemoryBlock dataPage; + long dataPagePosition; + boolean useOverflowPage = totalSpaceRequired > pageSizeBytes; + if (useOverflowPage) { + long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired); + // The record is larger than the page size, so allocate a special overflow page just to hold + // that record. + final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize); + if (memoryGranted != overflowPageSize) { + shuffleMemoryManager.release(memoryGranted); + spill(); + final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize); + if (memoryGrantedAfterSpill != overflowPageSize) { + shuffleMemoryManager.release(memoryGrantedAfterSpill); + throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory"); + } + } + MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize); + allocatedPages.add(overflowPage); + dataPage = overflowPage; + dataPagePosition = overflowPage.getBaseOffset(); + } else { + // The record is small enough to fit in a regular data page, but the current page might not + // have enough space to hold it (or no pages have been allocated yet). + acquireNewPageIfNecessary(totalSpaceRequired); + dataPage = currentPage; + dataPagePosition = currentPagePosition; + // Update bookkeeping information + freeSpaceInCurrentPage -= totalSpaceRequired; + currentPagePosition += totalSpaceRequired; } - assert(inMemSorter != null); + final Object dataPageBaseObject = dataPage.getBaseObject(); + + // --- Insert the record ---------------------------------------------------------------------- final long recordAddress = - taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); - final Object dataPageBaseObject = currentPage.getBaseObject(); - PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes); - currentPagePosition += 4; + taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition); + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes); + dataPagePosition += 4; PlatformDependent.copyMemory( recordBaseObject, recordBaseOffset, dataPageBaseObject, - currentPagePosition, + dataPagePosition, lengthInBytes); - currentPagePosition += lengthInBytes; - freeSpaceInCurrentPage -= totalSpaceRequired; + assert(inMemSorter != null); inMemSorter.insertRecord(recordAddress, prefix); } @@ -399,33 +431,70 @@ public final class UnsafeExternalSorter { public void insertKVRecord( Object keyBaseObj, long keyOffset, int keyLen, Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException { + + growPointerArrayIfNecessary(); final int totalSpaceRequired = keyLen + valueLen + 4 + 4; - if (!haveSpaceForRecord(totalSpaceRequired)) { - allocateSpaceForRecord(totalSpaceRequired); + + // --- Figure out where to insert the new record ---------------------------------------------- + + final MemoryBlock dataPage; + long dataPagePosition; + boolean useOverflowPage = totalSpaceRequired > pageSizeBytes; + if (useOverflowPage) { + long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired); + // The record is larger than the page size, so allocate a special overflow page just to hold + // that record. + final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize); + if (memoryGranted != overflowPageSize) { + shuffleMemoryManager.release(memoryGranted); + spill(); + final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize); + if (memoryGrantedAfterSpill != overflowPageSize) { + shuffleMemoryManager.release(memoryGrantedAfterSpill); + throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory"); + } + } + MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize); + allocatedPages.add(overflowPage); + dataPage = overflowPage; + dataPagePosition = overflowPage.getBaseOffset(); + } else { + // The record is small enough to fit in a regular data page, but the current page might not + // have enough space to hold it (or no pages have been allocated yet). + acquireNewPageIfNecessary(totalSpaceRequired); + dataPage = currentPage; + dataPagePosition = currentPagePosition; + // Update bookkeeping information + freeSpaceInCurrentPage -= totalSpaceRequired; + currentPagePosition += totalSpaceRequired; } - assert(inMemSorter != null); + final Object dataPageBaseObject = dataPage.getBaseObject(); + + // --- Insert the record ---------------------------------------------------------------------- final long recordAddress = - taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); - final Object dataPageBaseObject = currentPage.getBaseObject(); - PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen + valueLen + 4); - currentPagePosition += 4; + taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition); + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen + valueLen + 4); + dataPagePosition += 4; - PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen); - currentPagePosition += 4; + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen); + dataPagePosition += 4; PlatformDependent.copyMemory( - keyBaseObj, keyOffset, dataPageBaseObject, currentPagePosition, keyLen); - currentPagePosition += keyLen; + keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen); + dataPagePosition += keyLen; PlatformDependent.copyMemory( - valueBaseObj, valueOffset, dataPageBaseObject, currentPagePosition, valueLen); - currentPagePosition += valueLen; + valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, valueLen); - freeSpaceInCurrentPage -= totalSpaceRequired; + assert(inMemSorter != null); inMemSorter.insertRecord(recordAddress, prefix); } + /** + * Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()` + * after consuming this iterator. + */ public UnsafeSorterIterator getSortedIterator() throws IOException { assert(inMemSorter != null); final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator(); diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java index db9e827590..934b7e0305 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java @@ -32,8 +32,8 @@ public class PackedRecordPointerSuite { public void heap() { final TaskMemoryManager memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)); - final MemoryBlock page0 = memoryManager.allocatePage(100); - final MemoryBlock page1 = memoryManager.allocatePage(100); + final MemoryBlock page0 = memoryManager.allocatePage(128); + final MemoryBlock page1 = memoryManager.allocatePage(128); final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, page1.getBaseOffset() + 42); PackedRecordPointer packedPointer = new PackedRecordPointer(); @@ -50,8 +50,8 @@ public class PackedRecordPointerSuite { public void offHeap() { final TaskMemoryManager memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE)); - final MemoryBlock page0 = memoryManager.allocatePage(100); - final MemoryBlock page1 = memoryManager.allocatePage(100); + final MemoryBlock page0 = memoryManager.allocatePage(128); + final MemoryBlock page1 = memoryManager.allocatePage(128); final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, page1.getBaseOffset() + 42); PackedRecordPointer packedPointer = new PackedRecordPointer(); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index c11949d57a..968185bde7 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -18,8 +18,10 @@ package org.apache.spark.util.collection.unsafe.sort; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; import java.util.LinkedList; import java.util.UUID; @@ -34,6 +36,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.*; import static org.mockito.AdditionalAnswers.returnsSecondArg; import static org.mockito.Answers.RETURNS_SMART_NULLS; @@ -77,12 +80,13 @@ public class UnsafeExternalSorterSuite { } }; + SparkConf sparkConf; + File tempDir; ShuffleMemoryManager shuffleMemoryManager; @Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager; @Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager; @Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext; - File tempDir; private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "64m"); @@ -96,6 +100,7 @@ public class UnsafeExternalSorterSuite { @Before public void setUp() { MockitoAnnotations.initMocks(this); + sparkConf = new SparkConf(); tempDir = new File(Utils.createTempDir$default$1()); shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE); spillFilesCreated.clear(); @@ -155,14 +160,19 @@ public class UnsafeExternalSorterSuite { } private static void insertNumber(UnsafeExternalSorter sorter, int value) throws Exception { - final int[] arr = new int[] { value }; + final int[] arr = new int[]{ value }; sorter.insertRecord(arr, PlatformDependent.INT_ARRAY_OFFSET, 4, value); } - @Test - public void testSortingOnlyByPrefix() throws Exception { + private static void insertRecord( + UnsafeExternalSorter sorter, + int[] record, + long prefix) throws IOException { + sorter.insertRecord(record, PlatformDependent.INT_ARRAY_OFFSET, record.length * 4, prefix); + } - final UnsafeExternalSorter sorter = UnsafeExternalSorter.create( + private UnsafeExternalSorter newSorter() throws IOException { + return UnsafeExternalSorter.create( taskMemoryManager, shuffleMemoryManager, blockManager, @@ -171,7 +181,11 @@ public class UnsafeExternalSorterSuite { prefixComparator, /* initialSize */ 1024, pageSizeBytes); + } + @Test + public void testSortingOnlyByPrefix() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); insertNumber(sorter, 5); insertNumber(sorter, 1); insertNumber(sorter, 3); @@ -186,26 +200,16 @@ public class UnsafeExternalSorterSuite { iter.loadNext(); assertEquals(i, iter.getKeyPrefix()); assertEquals(4, iter.getRecordLength()); - // TODO: read rest of value. + assertEquals(i, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); } - sorter.freeMemory(); + sorter.cleanupResources(); assertSpillFilesWereCleanedUp(); } @Test public void testSortingEmptyArrays() throws Exception { - - final UnsafeExternalSorter sorter = UnsafeExternalSorter.create( - taskMemoryManager, - shuffleMemoryManager, - blockManager, - taskContext, - recordComparator, - prefixComparator, - /* initialSize */ 1024, - pageSizeBytes); - + final UnsafeExternalSorter sorter = newSorter(); sorter.insertRecord(null, 0, 0, 0); sorter.insertRecord(null, 0, 0, 0); sorter.spill(); @@ -222,28 +226,89 @@ public class UnsafeExternalSorterSuite { assertEquals(0, iter.getRecordLength()); } - sorter.freeMemory(); + sorter.cleanupResources(); assertSpillFilesWereCleanedUp(); } @Test - public void testFillingPage() throws Exception { + public void spillingOccursInResponseToMemoryPressure() throws Exception { + shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2); + final UnsafeExternalSorter sorter = newSorter(); + final int numRecords = 100000; + for (int i = 0; i <= numRecords; i++) { + insertNumber(sorter, numRecords - i); + } + // Ensure that spill files were created + assertThat(tempDir.listFiles().length, greaterThanOrEqualTo(1)); + // Read back the sorted data: + UnsafeSorterIterator iter = sorter.getSortedIterator(); - final UnsafeExternalSorter sorter = UnsafeExternalSorter.create( - taskMemoryManager, - shuffleMemoryManager, - blockManager, - taskContext, - recordComparator, - prefixComparator, - /* initialSize */ 1024, - pageSizeBytes); + int i = 0; + while (iter.hasNext()) { + iter.loadNext(); + assertEquals(i, iter.getKeyPrefix()); + assertEquals(4, iter.getRecordLength()); + assertEquals(i, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + i++; + } + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); + } + @Test + public void testFillingPage() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); byte[] record = new byte[16]; while (sorter.getNumberOfAllocatedPages() < 2) { sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0); } - sorter.freeMemory(); + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); + } + + @Test + public void sortingRecordsThatExceedPageSize() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + final int[] largeRecord = new int[(int) pageSizeBytes + 16]; + Arrays.fill(largeRecord, 456); + final int[] smallRecord = new int[100]; + Arrays.fill(smallRecord, 123); + + insertRecord(sorter, largeRecord, 456); + sorter.spill(); + insertRecord(sorter, smallRecord, 123); + sorter.spill(); + insertRecord(sorter, smallRecord, 123); + insertRecord(sorter, largeRecord, 456); + + UnsafeSorterIterator iter = sorter.getSortedIterator(); + // Small record + assertTrue(iter.hasNext()); + iter.loadNext(); + assertEquals(123, iter.getKeyPrefix()); + assertEquals(smallRecord.length * 4, iter.getRecordLength()); + assertEquals(123, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + // Small record + assertTrue(iter.hasNext()); + iter.loadNext(); + assertEquals(123, iter.getKeyPrefix()); + assertEquals(smallRecord.length * 4, iter.getRecordLength()); + assertEquals(123, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + // Large record + assertTrue(iter.hasNext()); + iter.loadNext(); + assertEquals(456, iter.getKeyPrefix()); + assertEquals(largeRecord.length * 4, iter.getRecordLength()); + assertEquals(456, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + // Large record + assertTrue(iter.hasNext()); + iter.loadNext(); + assertEquals(456, iter.getKeyPrefix()); + assertEquals(largeRecord.length * 4, iter.getRecordLength()); + assertEquals(456, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset())); + + assertFalse(iter.hasNext()); + sorter.cleanupResources(); assertSpillFilesWereCleanedUp(); } @@ -289,8 +354,10 @@ public class UnsafeExternalSorterSuite { newPeakMemory = sorter.getPeakMemoryUsedBytes(); assertEquals(previousPeakMemory, newPeakMemory); } finally { - sorter.freeMemory(); + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); } } } + |