diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-08-04 14:42:11 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-04 14:42:11 -0700 |
commit | ab8ee1a3b93286a62949569615086ef5030e9fae (patch) | |
tree | 88aa364451320f2a303bbeeb4857bcba57896c84 /core/src/main/java | |
parent | f4b1ac08a1327e6d0ddc317cdf3997a0f68dec72 (diff) | |
download | spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.gz spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.bz2 spark-ab8ee1a3b93286a62949569615086ef5030e9fae.zip |
[SPARK-9452] [SQL] Support records larger than page size in UnsafeExternalSorter
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits:
967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
948c344 [Josh Rosen] Add large records tests for KV sorter.
3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method
380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite
a49baef [Josh Rosen] Address initial round of review comments
3edb931 [Josh Rosen] Remove accidentally-committed debug statements.
2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Diffstat (limited to 'core/src/main/java')
-rw-r--r-- | core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 173 |
1 files changed, 121 insertions, 52 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index dec7fcfa0d..e6ddd08e5f 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -34,6 +34,7 @@ import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.shuffle.ShuffleMemoryManager; import org.apache.spark.storage.BlockManager; +import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.PlatformDependent; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.memory.TaskMemoryManager; @@ -143,8 +144,7 @@ public final class UnsafeExternalSorter { taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() { @Override public BoxedUnit apply() { - deleteSpillFiles(); - freeMemory(); + cleanupResources(); return null; } }); @@ -249,7 +249,7 @@ public final class UnsafeExternalSorter { * * @return the number of bytes freed. */ - public long freeMemory() { + private long freeMemory() { updatePeakMemoryUsed(); long memoryFreed = 0; for (MemoryBlock block : allocatedPages) { @@ -275,44 +275,32 @@ public final class UnsafeExternalSorter { /** * Deletes any spill files created by this sorter. */ - public void deleteSpillFiles() { + private void deleteSpillFiles() { for (UnsafeSorterSpillWriter spill : spillWriters) { File file = spill.getFile(); if (file != null && file.exists()) { if (!file.delete()) { logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); - }; + } } } } /** - * Checks whether there is enough space to insert a new record into the sorter. - * - * @param requiredSpace the required space in the data page, in bytes, including space for storing - * the record size. - - * @return true if the record can be inserted without requiring more allocations, false otherwise. + * Frees this sorter's in-memory data structures and cleans up its spill files. */ - private boolean haveSpaceForRecord(int requiredSpace) { - assert(requiredSpace > 0); - assert(inMemSorter != null); - return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage)); + public void cleanupResources() { + deleteSpillFiles(); + freeMemory(); } /** - * Allocates more memory in order to insert an additional record. This will request additional - * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be - * obtained. - * - * @param requiredSpace the required space in the data page, in bytes, including space for storing - * the record size. + * Checks whether there is enough space to insert an additional record in to the sort pointer + * array and grows the array if additional space is required. If the required space cannot be + * obtained, then the in-memory data will be spilled to disk. */ - private void allocateSpaceForRecord(int requiredSpace) throws IOException { + private void growPointerArrayIfNecessary() throws IOException { assert(inMemSorter != null); - // TODO: merge these steps to first calculate total memory requirements for this insert, - // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the - // data page. if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.debug("Attempting to expand sort pointer array"); final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage(); @@ -326,7 +314,20 @@ public final class UnsafeExternalSorter { shuffleMemoryManager.release(oldPointerArrayMemoryUsage); } } + } + /** + * Allocates more memory in order to insert an additional record. This will request additional + * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be + * obtained. + * + * @param requiredSpace the required space in the data page, in bytes, including space for storing + * the record size. This must be less than or equal to the page size (records + * that exceed the page size are handled via a different code path which uses + * special overflow pages). + */ + private void acquireNewPageIfNecessary(int requiredSpace) throws IOException { + assert (requiredSpace <= pageSizeBytes); if (requiredSpace > freeSpaceInCurrentPage) { logger.trace("Required space {} is less than free space in current page ({})", requiredSpace, freeSpaceInCurrentPage); @@ -339,9 +340,7 @@ public final class UnsafeExternalSorter { } else { final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes); if (memoryAcquired < pageSizeBytes) { - if (memoryAcquired > 0) { - shuffleMemoryManager.release(memoryAcquired); - } + shuffleMemoryManager.release(memoryAcquired); spill(); final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes); if (memoryAcquiredAfterSpilling != pageSizeBytes) { @@ -365,26 +364,59 @@ public final class UnsafeExternalSorter { long recordBaseOffset, int lengthInBytes, long prefix) throws IOException { + + growPointerArrayIfNecessary(); // Need 4 bytes to store the record length. final int totalSpaceRequired = lengthInBytes + 4; - if (!haveSpaceForRecord(totalSpaceRequired)) { - allocateSpaceForRecord(totalSpaceRequired); + + // --- Figure out where to insert the new record ---------------------------------------------- + + final MemoryBlock dataPage; + long dataPagePosition; + boolean useOverflowPage = totalSpaceRequired > pageSizeBytes; + if (useOverflowPage) { + long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired); + // The record is larger than the page size, so allocate a special overflow page just to hold + // that record. + final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize); + if (memoryGranted != overflowPageSize) { + shuffleMemoryManager.release(memoryGranted); + spill(); + final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize); + if (memoryGrantedAfterSpill != overflowPageSize) { + shuffleMemoryManager.release(memoryGrantedAfterSpill); + throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory"); + } + } + MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize); + allocatedPages.add(overflowPage); + dataPage = overflowPage; + dataPagePosition = overflowPage.getBaseOffset(); + } else { + // The record is small enough to fit in a regular data page, but the current page might not + // have enough space to hold it (or no pages have been allocated yet). + acquireNewPageIfNecessary(totalSpaceRequired); + dataPage = currentPage; + dataPagePosition = currentPagePosition; + // Update bookkeeping information + freeSpaceInCurrentPage -= totalSpaceRequired; + currentPagePosition += totalSpaceRequired; } - assert(inMemSorter != null); + final Object dataPageBaseObject = dataPage.getBaseObject(); + + // --- Insert the record ---------------------------------------------------------------------- final long recordAddress = - taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); - final Object dataPageBaseObject = currentPage.getBaseObject(); - PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes); - currentPagePosition += 4; + taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition); + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes); + dataPagePosition += 4; PlatformDependent.copyMemory( recordBaseObject, recordBaseOffset, dataPageBaseObject, - currentPagePosition, + dataPagePosition, lengthInBytes); - currentPagePosition += lengthInBytes; - freeSpaceInCurrentPage -= totalSpaceRequired; + assert(inMemSorter != null); inMemSorter.insertRecord(recordAddress, prefix); } @@ -399,33 +431,70 @@ public final class UnsafeExternalSorter { public void insertKVRecord( Object keyBaseObj, long keyOffset, int keyLen, Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException { + + growPointerArrayIfNecessary(); final int totalSpaceRequired = keyLen + valueLen + 4 + 4; - if (!haveSpaceForRecord(totalSpaceRequired)) { - allocateSpaceForRecord(totalSpaceRequired); + + // --- Figure out where to insert the new record ---------------------------------------------- + + final MemoryBlock dataPage; + long dataPagePosition; + boolean useOverflowPage = totalSpaceRequired > pageSizeBytes; + if (useOverflowPage) { + long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired); + // The record is larger than the page size, so allocate a special overflow page just to hold + // that record. + final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize); + if (memoryGranted != overflowPageSize) { + shuffleMemoryManager.release(memoryGranted); + spill(); + final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize); + if (memoryGrantedAfterSpill != overflowPageSize) { + shuffleMemoryManager.release(memoryGrantedAfterSpill); + throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory"); + } + } + MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize); + allocatedPages.add(overflowPage); + dataPage = overflowPage; + dataPagePosition = overflowPage.getBaseOffset(); + } else { + // The record is small enough to fit in a regular data page, but the current page might not + // have enough space to hold it (or no pages have been allocated yet). + acquireNewPageIfNecessary(totalSpaceRequired); + dataPage = currentPage; + dataPagePosition = currentPagePosition; + // Update bookkeeping information + freeSpaceInCurrentPage -= totalSpaceRequired; + currentPagePosition += totalSpaceRequired; } - assert(inMemSorter != null); + final Object dataPageBaseObject = dataPage.getBaseObject(); + + // --- Insert the record ---------------------------------------------------------------------- final long recordAddress = - taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); - final Object dataPageBaseObject = currentPage.getBaseObject(); - PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen + valueLen + 4); - currentPagePosition += 4; + taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition); + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen + valueLen + 4); + dataPagePosition += 4; - PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen); - currentPagePosition += 4; + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen); + dataPagePosition += 4; PlatformDependent.copyMemory( - keyBaseObj, keyOffset, dataPageBaseObject, currentPagePosition, keyLen); - currentPagePosition += keyLen; + keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen); + dataPagePosition += keyLen; PlatformDependent.copyMemory( - valueBaseObj, valueOffset, dataPageBaseObject, currentPagePosition, valueLen); - currentPagePosition += valueLen; + valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, valueLen); - freeSpaceInCurrentPage -= totalSpaceRequired; + assert(inMemSorter != null); inMemSorter.insertRecord(recordAddress, prefix); } + /** + * Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()` + * after consuming this iterator. + */ public UnsafeSorterIterator getSortedIterator() throws IOException { assert(inMemSorter != null); final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator(); |