diff options
author | Reynold Xin <rxin@databricks.com> | 2015-08-02 12:32:14 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-08-02 12:32:14 -0700 |
commit | 2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f (patch) | |
tree | f7458ae297d36bba1acf21fd08169defef6c2ef8 /core/src/main | |
parent | 66924ffa6bdb8e0df1b90b789cb7ad443377e729 (diff) | |
download | spark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.tar.gz spark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.tar.bz2 spark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.zip |
[SPARK-9531] [SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter
This pull request adds a destructAndCreateExternalSorter method to UnsafeFixedWidthAggregationMap. The new method does the following:
1. Creates a new external sorter UnsafeKVExternalSorter
2. Adds all the data into an in-memory sorter, sorts them
3. Spills the sorted in-memory data to disk
This method can be used to fallback to sort-based aggregation when under memory pressure.
The pull request also includes accounting fixes from JoshRosen.
TODOs (that can be done in follow-up PRs)
- [x] Address Josh's feedbacks from #7849
- [x] More documentation and test cases
- [x] Make sure we are doing memory accounting correctly with test cases (e.g. did we release the memory in BytesToBytesMap twice?)
- [ ] Look harder at possible memory leaks and exception handling
- [ ] Randomized tester for the KV sorter as well as the aggregation map
Author: Reynold Xin <rxin@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes #7860 from rxin/kvsorter and squashes the following commits:
986a58c [Reynold Xin] Bug fix.
599317c [Reynold Xin] Style fix and slightly more compact code.
fe7bd4e [Reynold Xin] Bug fixes.
fd71bef [Reynold Xin] Merge remote-tracking branch 'josh/large-records-in-sql-sorter' into kvsorter-with-josh-fix
3efae38 [Reynold Xin] More fixes and documentation.
45f1b09 [Josh Rosen] Ensure that spill files are cleaned up
f6a9bd3 [Reynold Xin] Josh feedback.
9be8139 [Reynold Xin] Remove testSpillFrequency.
7cbe759 [Reynold Xin] [SPARK-9531][SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter.
ae4a8af [Josh Rosen] Detect leaked unsafe memory in UnsafeExternalSorterSuite.
52f9b06 [Josh Rosen] Detect ShuffleMemoryManager leaks in UnsafeExternalSorter.
Diffstat (limited to 'core/src/main')
5 files changed, 187 insertions, 53 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index cf222b7272..01a66084e9 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -39,14 +39,22 @@ import org.apache.spark.unsafe.memory.*; /** * An append-only hash map where keys and values are contiguous regions of bytes. - * <p> + * * This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers, * which is guaranteed to exhaust the space. - * <p> + * * The map can support up to 2^29 keys. If the key cardinality is higher than this, you should * probably be using sorting instead of hashing for better cache locality. - * <p> - * This class is not thread safe. + * + * The key and values under the hood are stored together, in the following format: + * Bytes 0 to 4: len(k) (key length in bytes) + len(v) (value length in bytes) + 4 + * Bytes 4 to 8: len(k) + * Bytes 8 to 8 + len(k): key data + * Bytes 8 + len(k) to 8 + len(k) + len(v): value data + * + * This means that the first four bytes store the entire record (key + value) length. This format + * is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter}, + * so we can pass records from this map directly into the sorter to sort records in place. */ public final class BytesToBytesMap { @@ -253,7 +261,7 @@ public final class BytesToBytesMap { totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage); } loc.with(currentPage, offsetInPage); - offsetInPage += 8 + totalLength; + offsetInPage += 4 + totalLength; currentRecordNumber++; return loc; } @@ -366,7 +374,7 @@ public final class BytesToBytesMap { position += 4; keyLength = PlatformDependent.UNSAFE.getInt(page, position); position += 4; - valueLength = totalLength - keyLength; + valueLength = totalLength - keyLength - 4; keyMemoryLocation.setObjAndOffset(page, position); @@ -565,7 +573,7 @@ public final class BytesToBytesMap { insertCursor += valueLengthBytes; // word used to store the value size PlatformDependent.UNSAFE.putInt(dataPageBaseObject, recordOffset, - keyLengthBytes + valueLengthBytes); + keyLengthBytes + valueLengthBytes + 4); PlatformDependent.UNSAFE.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes); // Copy the key PlatformDependent.copyMemory( @@ -620,7 +628,7 @@ public final class BytesToBytesMap { * Free all allocated memory associated with this map, including the storage for keys and values * as well as the hash map array itself. * - * This method is idempotent. + * This method is idempotent and can be called multiple times. */ public void free() { longArray = null; @@ -639,6 +647,14 @@ public final class BytesToBytesMap { return taskMemoryManager; } + public ShuffleMemoryManager getShuffleMemoryManager() { + return shuffleMemoryManager; + } + + public long getPageSizeBytes() { + return pageSizeBytes; + } + /** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */ public long getTotalMemoryConsumption() { long totalDataPagesSize = 0L; diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index c05f2c332e..b984301cbb 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -17,9 +17,12 @@ package org.apache.spark.util.collection.unsafe.sort; +import java.io.File; import java.io.IOException; import java.util.LinkedList; +import javax.annotation.Nullable; + import scala.runtime.AbstractFunction0; import scala.runtime.BoxedUnit; @@ -27,7 +30,6 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.shuffle.ShuffleMemoryManager; @@ -48,7 +50,7 @@ public final class UnsafeExternalSorter { private final PrefixComparator prefixComparator; private final RecordComparator recordComparator; private final int initialSize; - private final TaskMemoryManager memoryManager; + private final TaskMemoryManager taskMemoryManager; private final ShuffleMemoryManager shuffleMemoryManager; private final BlockManager blockManager; private final TaskContext taskContext; @@ -63,26 +65,57 @@ public final class UnsafeExternalSorter { * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager * itself). */ - private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>(); + private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>(); + + private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>(); // These variables are reset after spilling: - private UnsafeInMemorySorter sorter; + private UnsafeInMemorySorter inMemSorter; + // Whether the in-mem sorter is created internally, or passed in from outside. + // If it is passed in from outside, we shouldn't release the in-mem sorter's memory. + private boolean isInMemSorterExternal = false; private MemoryBlock currentPage = null; private long currentPagePosition = -1; private long freeSpaceInCurrentPage = 0; - private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>(); + public static UnsafeExternalSorter createWithExistingInMemorySorter( + TaskMemoryManager taskMemoryManager, + ShuffleMemoryManager shuffleMemoryManager, + BlockManager blockManager, + TaskContext taskContext, + RecordComparator recordComparator, + PrefixComparator prefixComparator, + int initialSize, + long pageSizeBytes, + UnsafeInMemorySorter inMemorySorter) throws IOException { + return new UnsafeExternalSorter(taskMemoryManager, shuffleMemoryManager, blockManager, + taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter); + } - public UnsafeExternalSorter( - TaskMemoryManager memoryManager, + public static UnsafeExternalSorter create( + TaskMemoryManager taskMemoryManager, ShuffleMemoryManager shuffleMemoryManager, BlockManager blockManager, TaskContext taskContext, RecordComparator recordComparator, PrefixComparator prefixComparator, int initialSize, - SparkConf conf) throws IOException { - this.memoryManager = memoryManager; + long pageSizeBytes) throws IOException { + return new UnsafeExternalSorter(taskMemoryManager, shuffleMemoryManager, blockManager, + taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null); + } + + private UnsafeExternalSorter( + TaskMemoryManager taskMemoryManager, + ShuffleMemoryManager shuffleMemoryManager, + BlockManager blockManager, + TaskContext taskContext, + RecordComparator recordComparator, + PrefixComparator prefixComparator, + int initialSize, + long pageSizeBytes, + @Nullable UnsafeInMemorySorter existingInMemorySorter) throws IOException { + this.taskMemoryManager = taskMemoryManager; this.shuffleMemoryManager = shuffleMemoryManager; this.blockManager = blockManager; this.taskContext = taskContext; @@ -90,9 +123,18 @@ public final class UnsafeExternalSorter { this.prefixComparator = prefixComparator; this.initialSize = initialSize; // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units - this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; - this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m"); - initializeForWriting(); + // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; + this.fileBufferSizeBytes = 32 * 1024; + // this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m"); + this.pageSizeBytes = pageSizeBytes; + this.writeMetrics = new ShuffleWriteMetrics(); + + if (existingInMemorySorter == null) { + initializeForWriting(); + } else { + this.isInMemSorterExternal = true; + this.inMemSorter = existingInMemorySorter; + } // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator @@ -100,6 +142,7 @@ public final class UnsafeExternalSorter { taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() { @Override public BoxedUnit apply() { + deleteSpillFiles(); freeMemory(); return null; } @@ -114,22 +157,31 @@ public final class UnsafeExternalSorter { */ private void initializeForWriting() throws IOException { this.writeMetrics = new ShuffleWriteMetrics(); - // TODO: move this sizing calculation logic into a static method of sorter: - final long memoryRequested = initialSize * 8L * 2; - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested); - if (memoryAcquired != memoryRequested) { + final long pointerArrayMemory = + UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize); + final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pointerArrayMemory); + if (memoryAcquired != pointerArrayMemory) { shuffleMemoryManager.release(memoryAcquired); - throw new IOException("Could not acquire " + memoryRequested + " bytes of memory"); + throw new IOException("Could not acquire " + pointerArrayMemory + " bytes of memory"); } - this.sorter = - new UnsafeInMemorySorter(memoryManager, recordComparator, prefixComparator, initialSize); + this.inMemSorter = + new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize); + this.isInMemSorterExternal = false; } /** - * Sort and spill the current records in response to memory pressure. + * Marks the current page as no-more-space-available, and as a result, either allocate a + * new page or spill when we see the next record. */ @VisibleForTesting + public void closeCurrentPage() { + freeSpaceInCurrentPage = 0; + } + + /** + * Sort and spill the current records in response to memory pressure. + */ public void spill() throws IOException { logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), @@ -139,9 +191,9 @@ public final class UnsafeExternalSorter { final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, - sorter.numRecords()); + inMemSorter.numRecords()); spillWriters.add(spillWriter); - final UnsafeSorterIterator sortedRecords = sorter.getSortedIterator(); + final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator(); while (sortedRecords.hasNext()) { sortedRecords.loadNext(); final Object baseObject = sortedRecords.getBaseObject(); @@ -150,20 +202,24 @@ public final class UnsafeExternalSorter { spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix()); } spillWriter.close(); - final long sorterMemoryUsage = sorter.getMemoryUsage(); - sorter = null; - shuffleMemoryManager.release(sorterMemoryUsage); final long spillSize = freeMemory(); + // Note that this is more-or-less going to be a multiple of the page size, so wasted space in + // pages will currently be counted as memory spilled even though that space isn't actually + // written to disk. This also counts the space needed to store the sorter's pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); initializeForWriting(); } + /** + * Return the total memory usage of this sorter, including the data pages and the sorter's pointer + * array. + */ private long getMemoryUsage() { long totalPageSize = 0; for (MemoryBlock page : allocatedPages) { totalPageSize += page.size(); } - return sorter.getMemoryUsage() + totalPageSize; + return inMemSorter.getMemoryUsage() + totalPageSize; } @VisibleForTesting @@ -171,13 +227,26 @@ public final class UnsafeExternalSorter { return allocatedPages.size(); } + /** + * Free this sorter's in-memory data structures, including its data pages and pointer array. + * + * @return the number of bytes freed. + */ public long freeMemory() { long memoryFreed = 0; for (MemoryBlock block : allocatedPages) { - memoryManager.freePage(block); + taskMemoryManager.freePage(block); shuffleMemoryManager.release(block.size()); memoryFreed += block.size(); } + if (inMemSorter != null) { + if (!isInMemSorterExternal) { + long sorterMemoryUsage = inMemSorter.getMemoryUsage(); + memoryFreed += sorterMemoryUsage; + shuffleMemoryManager.release(sorterMemoryUsage); + } + inMemSorter = null; + } allocatedPages.clear(); currentPage = null; currentPagePosition = -1; @@ -186,6 +255,20 @@ public final class UnsafeExternalSorter { } /** + * Deletes any spill files created by this sorter. + */ + public void deleteSpillFiles() { + for (UnsafeSorterSpillWriter spill : spillWriters) { + File file = spill.getFile(); + if (file != null && file.exists()) { + if (!file.delete()) { + logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); + }; + } + } + } + + /** * Checks whether there is enough space to insert a new record into the sorter. * * @param requiredSpace the required space in the data page, in bytes, including space for storing @@ -195,7 +278,7 @@ public final class UnsafeExternalSorter { */ private boolean haveSpaceForRecord(int requiredSpace) { assert (requiredSpace > 0); - return (sorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage)); + return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage)); } /** @@ -210,16 +293,16 @@ public final class UnsafeExternalSorter { // TODO: merge these steps to first calculate total memory requirements for this insert, // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the // data page. - if (!sorter.hasSpaceForAnotherRecord()) { + if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.debug("Attempting to expand sort pointer array"); - final long oldPointerArrayMemoryUsage = sorter.getMemoryUsage(); + final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage(); final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2; final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray); if (memoryAcquired < memoryToGrowPointerArray) { shuffleMemoryManager.release(memoryAcquired); spill(); } else { - sorter.expandPointerArray(); + inMemSorter.expandPointerArray(); shuffleMemoryManager.release(oldPointerArrayMemoryUsage); } } @@ -236,7 +319,9 @@ public final class UnsafeExternalSorter { } else { final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes); if (memoryAcquired < pageSizeBytes) { - shuffleMemoryManager.release(memoryAcquired); + if (memoryAcquired > 0) { + shuffleMemoryManager.release(memoryAcquired); + } spill(); final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes); if (memoryAcquiredAfterSpilling != pageSizeBytes) { @@ -244,7 +329,7 @@ public final class UnsafeExternalSorter { throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory"); } } - currentPage = memoryManager.allocatePage(pageSizeBytes); + currentPage = taskMemoryManager.allocatePage(pageSizeBytes); currentPagePosition = currentPage.getBaseOffset(); freeSpaceInCurrentPage = pageSizeBytes; allocatedPages.add(currentPage); @@ -267,7 +352,7 @@ public final class UnsafeExternalSorter { } final long recordAddress = - memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); + taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); final Object dataPageBaseObject = currentPage.getBaseObject(); PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes); currentPagePosition += 4; @@ -279,26 +364,48 @@ public final class UnsafeExternalSorter { lengthInBytes); currentPagePosition += lengthInBytes; freeSpaceInCurrentPage -= totalSpaceRequired; - sorter.insertRecord(recordAddress, prefix); + inMemSorter.insertRecord(recordAddress, prefix); } /** - * Write a record to the sorter. The record is broken down into two different parts, and + * Write a key-value record to the sorter. The key and value will be put together in-memory, + * using the following format: * + * record length (4 bytes), key length (4 bytes), key data, value data + * + * record length = key length + value length + 4 */ - public void insertRecord( - Object recordBaseObject1, - long recordBaseOffset1, - int lengthInBytes1, - Object recordBaseObject2, - long recordBaseOffset2, - int lengthInBytes2, - long prefix) throws IOException { + public void insertKVRecord( + Object keyBaseObj, long keyOffset, int keyLen, + Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException { + final int totalSpaceRequired = keyLen + valueLen + 4 + 4; + if (!haveSpaceForRecord(totalSpaceRequired)) { + allocateSpaceForRecord(totalSpaceRequired); + } + + final long recordAddress = + taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); + final Object dataPageBaseObject = currentPage.getBaseObject(); + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen + valueLen + 4); + currentPagePosition += 4; + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen); + currentPagePosition += 4; + + PlatformDependent.copyMemory( + keyBaseObj, keyOffset, dataPageBaseObject, currentPagePosition, keyLen); + currentPagePosition += keyLen; + + PlatformDependent.copyMemory( + valueBaseObj, valueOffset, dataPageBaseObject, currentPagePosition, valueLen); + currentPagePosition += valueLen; + + freeSpaceInCurrentPage -= totalSpaceRequired; + inMemSorter.insertRecord(recordAddress, prefix); } public UnsafeSorterIterator getSortedIterator() throws IOException { - final UnsafeSorterIterator inMemoryIterator = sorter.getSortedIterator(); + final UnsafeSorterIterator inMemoryIterator = inMemSorter.getSortedIterator(); int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0); if (spillWriters.isEmpty()) { return inMemoryIterator; diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index fc34ad9cff..3131465391 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -100,6 +100,10 @@ public final class UnsafeInMemorySorter { return pointerArray.length * 8L; } + static long getMemoryRequirementsForPointerArray(long numEntries) { + return numEntries * 2L * 8L; + } + public boolean hasSpaceForAnotherRecord() { return pointerArrayInsertPosition + 2 < pointerArray.length; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 29e9e0f30f..ca1ccedc93 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -31,6 +31,7 @@ import org.apache.spark.unsafe.PlatformDependent; */ final class UnsafeSorterSpillReader extends UnsafeSorterIterator { + private final File file; private InputStream in; private DataInputStream din; @@ -48,6 +49,7 @@ final class UnsafeSorterSpillReader extends UnsafeSorterIterator { File file, BlockId blockId) throws IOException { assert (file.length() > 0); + this.file = file; final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file)); this.in = blockManager.wrapForCompression(blockId, bs); this.din = new DataInputStream(this.in); @@ -71,6 +73,7 @@ final class UnsafeSorterSpillReader extends UnsafeSorterIterator { numRecordsRemaining--; if (numRecordsRemaining == 0) { in.close(); + file.delete(); in = null; din = null; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java index 71eed29563..44cf6c756d 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java @@ -140,6 +140,10 @@ final class UnsafeSorterSpillWriter { writeBuffer = null; } + public File getFile() { + return file; + } + public UnsafeSorterSpillReader getReader(BlockManager blockManager) throws IOException { return new UnsafeSorterSpillReader(blockManager, file, blockId); } |