diff options
Diffstat (limited to 'core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java')
-rw-r--r-- | core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 426 |
1 files changed, 223 insertions, 203 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index e317ea391c..49a5a4b13b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -17,39 +17,34 @@ package org.apache.spark.util.collection.unsafe.sort; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.LinkedList; -import javax.annotation.Nullable; - -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; - import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.storage.BlockManager; -import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.util.TaskCompletionListener; import org.apache.spark.util.Utils; /** * External sorter based on {@link UnsafeInMemorySorter}. */ -public final class UnsafeExternalSorter { +public final class UnsafeExternalSorter extends MemoryConsumer { private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class); - private final long pageSizeBytes; private final PrefixComparator prefixComparator; private final RecordComparator recordComparator; - private final int initialSize; private final TaskMemoryManager taskMemoryManager; private final BlockManager blockManager; private final TaskContext taskContext; @@ -69,14 +64,12 @@ public final class UnsafeExternalSorter { private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>(); // These variables are reset after spilling: - @Nullable private UnsafeInMemorySorter inMemSorter; - // Whether the in-mem sorter is created internally, or passed in from outside. - // If it is passed in from outside, we shouldn't release the in-mem sorter's memory. - private boolean isInMemSorterExternal = false; + @Nullable private volatile UnsafeInMemorySorter inMemSorter; + private MemoryBlock currentPage = null; - private long currentPagePosition = -1; - private long freeSpaceInCurrentPage = 0; + private long pageCursor = -1; private long peakMemoryUsedBytes = 0; + private volatile SpillableIterator readingIterator = null; public static UnsafeExternalSorter createWithExistingInMemorySorter( TaskMemoryManager taskMemoryManager, @@ -86,7 +79,7 @@ public final class UnsafeExternalSorter { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, - UnsafeInMemorySorter inMemorySorter) throws IOException { + UnsafeInMemorySorter inMemorySorter) { return new UnsafeExternalSorter(taskMemoryManager, blockManager, taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter); } @@ -98,7 +91,7 @@ public final class UnsafeExternalSorter { RecordComparator recordComparator, PrefixComparator prefixComparator, int initialSize, - long pageSizeBytes) throws IOException { + long pageSizeBytes) { return new UnsafeExternalSorter(taskMemoryManager, blockManager, taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null); } @@ -111,60 +104,41 @@ public final class UnsafeExternalSorter { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, - @Nullable UnsafeInMemorySorter existingInMemorySorter) throws IOException { + @Nullable UnsafeInMemorySorter existingInMemorySorter) { + super(taskMemoryManager, pageSizeBytes); this.taskMemoryManager = taskMemoryManager; this.blockManager = blockManager; this.taskContext = taskContext; this.recordComparator = recordComparator; this.prefixComparator = prefixComparator; - this.initialSize = initialSize; // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.fileBufferSizeBytes = 32 * 1024; - this.pageSizeBytes = pageSizeBytes; + // TODO: metrics tracking + integration with shuffle write metrics + // need to connect the write metrics to task metrics so we count the spill IO somewhere. this.writeMetrics = new ShuffleWriteMetrics(); if (existingInMemorySorter == null) { - initializeForWriting(); - // Acquire a new page as soon as we construct the sorter to ensure that we have at - // least one page to work with. Otherwise, other operators in the same task may starve - // this sorter (SPARK-9709). We don't need to do this if we already have an existing sorter. - acquireNewPage(); + this.inMemSorter = + new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize); + acquireMemory(inMemSorter.getMemoryUsage()); } else { - this.isInMemSorterExternal = true; this.inMemSorter = existingInMemorySorter; + // will acquire after free the map } + this.peakMemoryUsedBytes = getMemoryUsage(); // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator // does not fully consume the sorter's output (e.g. sort followed by limit). - taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - cleanupResources(); - return null; + taskContext.addTaskCompletionListener( + new TaskCompletionListener() { + @Override + public void onTaskCompletion(TaskContext context) { + cleanupResources(); + } } - }); - } - - // TODO: metrics tracking + integration with shuffle write metrics - // need to connect the write metrics to task metrics so we count the spill IO somewhere. - - /** - * Allocates new sort data structures. Called when creating the sorter and after each spill. - */ - private void initializeForWriting() throws IOException { - // Note: Do not track memory for the pointer array for now because of SPARK-10474. - // In more detail, in TungstenAggregate we only reserve a page, but when we fall back to - // sort-based aggregation we try to acquire a page AND a pointer array, which inevitably - // fails if all other memory is already occupied. It should be safe to not track the array - // because its memory footprint is frequently much smaller than that of a page. This is a - // temporary hack that we should address in 1.6.0. - // TODO: track the pointer array memory! - this.writeMetrics = new ShuffleWriteMetrics(); - this.inMemSorter = - new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize); - this.isInMemSorterExternal = false; + ); } /** @@ -173,14 +147,27 @@ public final class UnsafeExternalSorter { */ @VisibleForTesting public void closeCurrentPage() { - freeSpaceInCurrentPage = 0; + if (currentPage != null) { + pageCursor = currentPage.getBaseOffset() + currentPage.size(); + } } /** * Sort and spill the current records in response to memory pressure. */ - public void spill() throws IOException { - assert(inMemSorter != null); + @Override + public long spill(long size, MemoryConsumer trigger) throws IOException { + if (trigger != this) { + if (readingIterator != null) { + return readingIterator.spill(); + } + return 0L; + } + + if (inMemSorter == null || inMemSorter.numRecords() <= 0) { + return 0L; + } + logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), Utils.bytesToString(getMemoryUsage()), @@ -202,6 +189,8 @@ public final class UnsafeExternalSorter { spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix()); } spillWriter.close(); + + inMemSorter.reset(); } final long spillSize = freeMemory(); @@ -210,7 +199,7 @@ public final class UnsafeExternalSorter { // written to disk. This also counts the space needed to store the sorter's pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); - initializeForWriting(); + return spillSize; } /** @@ -246,7 +235,7 @@ public final class UnsafeExternalSorter { } /** - * Free this sorter's in-memory data structures, including its data pages and pointer array. + * Free this sorter's data pages. * * @return the number of bytes freed. */ @@ -254,14 +243,12 @@ public final class UnsafeExternalSorter { updatePeakMemoryUsed(); long memoryFreed = 0; for (MemoryBlock block : allocatedPages) { - taskMemoryManager.freePage(block); memoryFreed += block.size(); + freePage(block); } - // TODO: track in-memory sorter memory usage (SPARK-10474) allocatedPages.clear(); currentPage = null; - currentPagePosition = -1; - freeSpaceInCurrentPage = 0; + pageCursor = 0; return memoryFreed; } @@ -283,8 +270,15 @@ public final class UnsafeExternalSorter { * Frees this sorter's in-memory data structures and cleans up its spill files. */ public void cleanupResources() { - deleteSpillFiles(); - freeMemory(); + synchronized (this) { + deleteSpillFiles(); + freeMemory(); + if (inMemSorter != null) { + long used = inMemSorter.getMemoryUsage(); + inMemSorter = null; + releaseMemory(used); + } + } } /** @@ -295,8 +289,28 @@ public final class UnsafeExternalSorter { private void growPointerArrayIfNecessary() throws IOException { assert(inMemSorter != null); if (!inMemSorter.hasSpaceForAnotherRecord()) { - // TODO: track the pointer array memory! (SPARK-10474) - inMemSorter.expandPointerArray(); + long used = inMemSorter.getMemoryUsage(); + long needed = used + inMemSorter.getMemoryToExpand(); + try { + acquireMemory(needed); // could trigger spilling + } catch (OutOfMemoryError e) { + // should have trigger spilling + assert(inMemSorter.hasSpaceForAnotherRecord()); + return; + } + // check if spilling is triggered or not + if (inMemSorter.hasSpaceForAnotherRecord()) { + releaseMemory(needed); + } else { + try { + inMemSorter.expandPointerArray(); + releaseMemory(used); + } catch (OutOfMemoryError oom) { + // Just in case that JVM had run out of memory + releaseMemory(needed); + spill(); + } + } } } @@ -304,101 +318,38 @@ public final class UnsafeExternalSorter { * Allocates more memory in order to insert an additional record. This will request additional * memory from the memory manager and spill if the requested memory can not be obtained. * - * @param requiredSpace the required space in the data page, in bytes, including space for storing + * @param required the required space in the data page, in bytes, including space for storing * the record size. This must be less than or equal to the page size (records * that exceed the page size are handled via a different code path which uses * special overflow pages). */ - private void acquireNewPageIfNecessary(int requiredSpace) throws IOException { - assert (requiredSpace <= pageSizeBytes); - if (requiredSpace > freeSpaceInCurrentPage) { - logger.trace("Required space {} is less than free space in current page ({})", requiredSpace, - freeSpaceInCurrentPage); - // TODO: we should track metrics on the amount of space wasted when we roll over to a new page - // without using the free space at the end of the current page. We should also do this for - // BytesToBytesMap. - if (requiredSpace > pageSizeBytes) { - throw new IOException("Required space " + requiredSpace + " is greater than page size (" + - pageSizeBytes + ")"); - } else { - acquireNewPage(); - } + private void acquireNewPageIfNecessary(int required) { + if (currentPage == null || + pageCursor + required > currentPage.getBaseOffset() + currentPage.size()) { + // TODO: try to find space on previous pages + currentPage = allocatePage(required); + pageCursor = currentPage.getBaseOffset(); + allocatedPages.add(currentPage); } } /** - * Acquire a new page from the memory manager. - * - * If there is not enough space to allocate the new page, spill all existing ones - * and try again. If there is still not enough space, report error to the caller. - */ - private void acquireNewPage() throws IOException { - currentPage = taskMemoryManager.allocatePage(pageSizeBytes); - if (currentPage == null) { - spill(); - currentPage = taskMemoryManager.allocatePage(pageSizeBytes); - if (currentPage == null) { - throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory"); - } - } - currentPagePosition = currentPage.getBaseOffset(); - freeSpaceInCurrentPage = pageSizeBytes; - allocatedPages.add(currentPage); - } - - /** * Write a record to the sorter. */ - public void insertRecord( - Object recordBaseObject, - long recordBaseOffset, - int lengthInBytes, - long prefix) throws IOException { + public void insertRecord(Object recordBase, long recordOffset, int length, long prefix) + throws IOException { growPointerArrayIfNecessary(); // Need 4 bytes to store the record length. - final int totalSpaceRequired = lengthInBytes + 4; - - // --- Figure out where to insert the new record ---------------------------------------------- - - final MemoryBlock dataPage; - long dataPagePosition; - boolean useOverflowPage = totalSpaceRequired > pageSizeBytes; - if (useOverflowPage) { - long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired); - // The record is larger than the page size, so allocate a special overflow page just to hold - // that record. - MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize); - if (overflowPage == null) { - spill(); - overflowPage = taskMemoryManager.allocatePage(overflowPageSize); - if (overflowPage == null) { - throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory"); - } - } - allocatedPages.add(overflowPage); - dataPage = overflowPage; - dataPagePosition = overflowPage.getBaseOffset(); - } else { - // The record is small enough to fit in a regular data page, but the current page might not - // have enough space to hold it (or no pages have been allocated yet). - acquireNewPageIfNecessary(totalSpaceRequired); - dataPage = currentPage; - dataPagePosition = currentPagePosition; - // Update bookkeeping information - freeSpaceInCurrentPage -= totalSpaceRequired; - currentPagePosition += totalSpaceRequired; - } - final Object dataPageBaseObject = dataPage.getBaseObject(); - - // --- Insert the record ---------------------------------------------------------------------- - - final long recordAddress = - taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition); - Platform.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes); - dataPagePosition += 4; - Platform.copyMemory( - recordBaseObject, recordBaseOffset, dataPageBaseObject, dataPagePosition, lengthInBytes); + final int required = length + 4; + acquireNewPageIfNecessary(required); + + final Object base = currentPage.getBaseObject(); + final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); + Platform.putInt(base, pageCursor, length); + pageCursor += 4; + Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); + pageCursor += length; assert(inMemSorter != null); inMemSorter.insertRecord(recordAddress, prefix); } @@ -411,59 +362,24 @@ public final class UnsafeExternalSorter { * * record length = key length + value length + 4 */ - public void insertKVRecord( - Object keyBaseObj, long keyOffset, int keyLen, - Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException { + public void insertKVRecord(Object keyBase, long keyOffset, int keyLen, + Object valueBase, long valueOffset, int valueLen, long prefix) + throws IOException { growPointerArrayIfNecessary(); - final int totalSpaceRequired = keyLen + valueLen + 4 + 4; - - // --- Figure out where to insert the new record ---------------------------------------------- - - final MemoryBlock dataPage; - long dataPagePosition; - boolean useOverflowPage = totalSpaceRequired > pageSizeBytes; - if (useOverflowPage) { - long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired); - // The record is larger than the page size, so allocate a special overflow page just to hold - // that record. - MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize); - if (overflowPage == null) { - spill(); - overflowPage = taskMemoryManager.allocatePage(overflowPageSize); - if (overflowPage == null) { - throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory"); - } - } - allocatedPages.add(overflowPage); - dataPage = overflowPage; - dataPagePosition = overflowPage.getBaseOffset(); - } else { - // The record is small enough to fit in a regular data page, but the current page might not - // have enough space to hold it (or no pages have been allocated yet). - acquireNewPageIfNecessary(totalSpaceRequired); - dataPage = currentPage; - dataPagePosition = currentPagePosition; - // Update bookkeeping information - freeSpaceInCurrentPage -= totalSpaceRequired; - currentPagePosition += totalSpaceRequired; - } - final Object dataPageBaseObject = dataPage.getBaseObject(); - - // --- Insert the record ---------------------------------------------------------------------- - - final long recordAddress = - taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition); - Platform.putInt(dataPageBaseObject, dataPagePosition, keyLen + valueLen + 4); - dataPagePosition += 4; - - Platform.putInt(dataPageBaseObject, dataPagePosition, keyLen); - dataPagePosition += 4; - - Platform.copyMemory(keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen); - dataPagePosition += keyLen; - - Platform.copyMemory(valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, valueLen); + final int required = keyLen + valueLen + 4 + 4; + acquireNewPageIfNecessary(required); + + final Object base = currentPage.getBaseObject(); + final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); + Platform.putInt(base, pageCursor, keyLen + valueLen + 4); + pageCursor += 4; + Platform.putInt(base, pageCursor, keyLen); + pageCursor += 4; + Platform.copyMemory(keyBase, keyOffset, base, pageCursor, keyLen); + pageCursor += keyLen; + Platform.copyMemory(valueBase, valueOffset, base, pageCursor, valueLen); + pageCursor += valueLen; assert(inMemSorter != null); inMemSorter.insertRecord(recordAddress, prefix); @@ -475,10 +391,10 @@ public final class UnsafeExternalSorter { */ public UnsafeSorterIterator getSortedIterator() throws IOException { assert(inMemSorter != null); - final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator(); - int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0); + readingIterator = new SpillableIterator(inMemSorter.getSortedIterator()); + int numIteratorsToMerge = spillWriters.size() + (readingIterator.hasNext() ? 1 : 0); if (spillWriters.isEmpty()) { - return inMemoryIterator; + return readingIterator; } else { final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge); @@ -486,9 +402,113 @@ public final class UnsafeExternalSorter { spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager)); } spillWriters.clear(); - spillMerger.addSpillIfNotEmpty(inMemoryIterator); + spillMerger.addSpillIfNotEmpty(readingIterator); return spillMerger.getSortedIterator(); } } + + /** + * An UnsafeSorterIterator that support spilling. + */ + class SpillableIterator extends UnsafeSorterIterator { + private UnsafeSorterIterator upstream; + private UnsafeSorterIterator nextUpstream = null; + private MemoryBlock lastPage = null; + private boolean loaded = false; + private int numRecords = 0; + + public SpillableIterator(UnsafeInMemorySorter.SortedIterator inMemIterator) { + this.upstream = inMemIterator; + this.numRecords = inMemIterator.numRecordsLeft(); + } + + public long spill() throws IOException { + synchronized (this) { + if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null + && numRecords > 0)) { + return 0L; + } + + UnsafeInMemorySorter.SortedIterator inMemIterator = + ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); + + final UnsafeSorterSpillWriter spillWriter = + new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); + while (inMemIterator.hasNext()) { + inMemIterator.loadNext(); + final Object baseObject = inMemIterator.getBaseObject(); + final long baseOffset = inMemIterator.getBaseOffset(); + final int recordLength = inMemIterator.getRecordLength(); + spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix()); + } + spillWriter.close(); + spillWriters.add(spillWriter); + nextUpstream = spillWriter.getReader(blockManager); + + long released = 0L; + synchronized (UnsafeExternalSorter.this) { + // release the pages except the one that is used + for (MemoryBlock page : allocatedPages) { + if (!loaded || page.getBaseObject() != inMemIterator.getBaseObject()) { + released += page.size(); + freePage(page); + } else { + lastPage = page; + } + } + allocatedPages.clear(); + } + return released; + } + } + + @Override + public boolean hasNext() { + return numRecords > 0; + } + + @Override + public void loadNext() throws IOException { + synchronized (this) { + loaded = true; + if (nextUpstream != null) { + // Just consumed the last record from in memory iterator + if (lastPage != null) { + freePage(lastPage); + lastPage = null; + } + upstream = nextUpstream; + nextUpstream = null; + + assert(inMemSorter != null); + long used = inMemSorter.getMemoryUsage(); + inMemSorter = null; + releaseMemory(used); + } + numRecords--; + upstream.loadNext(); + } + } + + @Override + public Object getBaseObject() { + return upstream.getBaseObject(); + } + + @Override + public long getBaseOffset() { + return upstream.getBaseOffset(); + } + + @Override + public int getRecordLength() { + return upstream.getRecordLength(); + } + + @Override + public long getKeyPrefix() { + return upstream.getKeyPrefix(); + } + } } |