aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java426
1 files changed, 223 insertions, 203 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index e317ea391c..49a5a4b13b 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -17,39 +17,34 @@
package org.apache.spark.util.collection.unsafe.sort;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
-import javax.annotation.Nullable;
-
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
-
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.storage.BlockManager;
-import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.util.TaskCompletionListener;
import org.apache.spark.util.Utils;
/**
* External sorter based on {@link UnsafeInMemorySorter}.
*/
-public final class UnsafeExternalSorter {
+public final class UnsafeExternalSorter extends MemoryConsumer {
private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
- private final long pageSizeBytes;
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
- private final int initialSize;
private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
@@ -69,14 +64,12 @@ public final class UnsafeExternalSorter {
private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
// These variables are reset after spilling:
- @Nullable private UnsafeInMemorySorter inMemSorter;
- // Whether the in-mem sorter is created internally, or passed in from outside.
- // If it is passed in from outside, we shouldn't release the in-mem sorter's memory.
- private boolean isInMemSorterExternal = false;
+ @Nullable private volatile UnsafeInMemorySorter inMemSorter;
+
private MemoryBlock currentPage = null;
- private long currentPagePosition = -1;
- private long freeSpaceInCurrentPage = 0;
+ private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
+ private volatile SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
TaskMemoryManager taskMemoryManager,
@@ -86,7 +79,7 @@ public final class UnsafeExternalSorter {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
- UnsafeInMemorySorter inMemorySorter) throws IOException {
+ UnsafeInMemorySorter inMemorySorter) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter);
}
@@ -98,7 +91,7 @@ public final class UnsafeExternalSorter {
RecordComparator recordComparator,
PrefixComparator prefixComparator,
int initialSize,
- long pageSizeBytes) throws IOException {
+ long pageSizeBytes) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null);
}
@@ -111,60 +104,41 @@ public final class UnsafeExternalSorter {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
- @Nullable UnsafeInMemorySorter existingInMemorySorter) throws IOException {
+ @Nullable UnsafeInMemorySorter existingInMemorySorter) {
+ super(taskMemoryManager, pageSizeBytes);
this.taskMemoryManager = taskMemoryManager;
this.blockManager = blockManager;
this.taskContext = taskContext;
this.recordComparator = recordComparator;
this.prefixComparator = prefixComparator;
- this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
- this.pageSizeBytes = pageSizeBytes;
+ // TODO: metrics tracking + integration with shuffle write metrics
+ // need to connect the write metrics to task metrics so we count the spill IO somewhere.
this.writeMetrics = new ShuffleWriteMetrics();
if (existingInMemorySorter == null) {
- initializeForWriting();
- // Acquire a new page as soon as we construct the sorter to ensure that we have at
- // least one page to work with. Otherwise, other operators in the same task may starve
- // this sorter (SPARK-9709). We don't need to do this if we already have an existing sorter.
- acquireNewPage();
+ this.inMemSorter =
+ new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
+ acquireMemory(inMemSorter.getMemoryUsage());
} else {
- this.isInMemSorterExternal = true;
this.inMemSorter = existingInMemorySorter;
+ // will acquire after free the map
}
+ this.peakMemoryUsedBytes = getMemoryUsage();
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
- taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- cleanupResources();
- return null;
+ taskContext.addTaskCompletionListener(
+ new TaskCompletionListener() {
+ @Override
+ public void onTaskCompletion(TaskContext context) {
+ cleanupResources();
+ }
}
- });
- }
-
- // TODO: metrics tracking + integration with shuffle write metrics
- // need to connect the write metrics to task metrics so we count the spill IO somewhere.
-
- /**
- * Allocates new sort data structures. Called when creating the sorter and after each spill.
- */
- private void initializeForWriting() throws IOException {
- // Note: Do not track memory for the pointer array for now because of SPARK-10474.
- // In more detail, in TungstenAggregate we only reserve a page, but when we fall back to
- // sort-based aggregation we try to acquire a page AND a pointer array, which inevitably
- // fails if all other memory is already occupied. It should be safe to not track the array
- // because its memory footprint is frequently much smaller than that of a page. This is a
- // temporary hack that we should address in 1.6.0.
- // TODO: track the pointer array memory!
- this.writeMetrics = new ShuffleWriteMetrics();
- this.inMemSorter =
- new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
- this.isInMemSorterExternal = false;
+ );
}
/**
@@ -173,14 +147,27 @@ public final class UnsafeExternalSorter {
*/
@VisibleForTesting
public void closeCurrentPage() {
- freeSpaceInCurrentPage = 0;
+ if (currentPage != null) {
+ pageCursor = currentPage.getBaseOffset() + currentPage.size();
+ }
}
/**
* Sort and spill the current records in response to memory pressure.
*/
- public void spill() throws IOException {
- assert(inMemSorter != null);
+ @Override
+ public long spill(long size, MemoryConsumer trigger) throws IOException {
+ if (trigger != this) {
+ if (readingIterator != null) {
+ return readingIterator.spill();
+ }
+ return 0L;
+ }
+
+ if (inMemSorter == null || inMemSorter.numRecords() <= 0) {
+ return 0L;
+ }
+
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
@@ -202,6 +189,8 @@ public final class UnsafeExternalSorter {
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
}
spillWriter.close();
+
+ inMemSorter.reset();
}
final long spillSize = freeMemory();
@@ -210,7 +199,7 @@ public final class UnsafeExternalSorter {
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
- initializeForWriting();
+ return spillSize;
}
/**
@@ -246,7 +235,7 @@ public final class UnsafeExternalSorter {
}
/**
- * Free this sorter's in-memory data structures, including its data pages and pointer array.
+ * Free this sorter's data pages.
*
* @return the number of bytes freed.
*/
@@ -254,14 +243,12 @@ public final class UnsafeExternalSorter {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
- taskMemoryManager.freePage(block);
memoryFreed += block.size();
+ freePage(block);
}
- // TODO: track in-memory sorter memory usage (SPARK-10474)
allocatedPages.clear();
currentPage = null;
- currentPagePosition = -1;
- freeSpaceInCurrentPage = 0;
+ pageCursor = 0;
return memoryFreed;
}
@@ -283,8 +270,15 @@ public final class UnsafeExternalSorter {
* Frees this sorter's in-memory data structures and cleans up its spill files.
*/
public void cleanupResources() {
- deleteSpillFiles();
- freeMemory();
+ synchronized (this) {
+ deleteSpillFiles();
+ freeMemory();
+ if (inMemSorter != null) {
+ long used = inMemSorter.getMemoryUsage();
+ inMemSorter = null;
+ releaseMemory(used);
+ }
+ }
}
/**
@@ -295,8 +289,28 @@ public final class UnsafeExternalSorter {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
- // TODO: track the pointer array memory! (SPARK-10474)
- inMemSorter.expandPointerArray();
+ long used = inMemSorter.getMemoryUsage();
+ long needed = used + inMemSorter.getMemoryToExpand();
+ try {
+ acquireMemory(needed); // could trigger spilling
+ } catch (OutOfMemoryError e) {
+ // should have trigger spilling
+ assert(inMemSorter.hasSpaceForAnotherRecord());
+ return;
+ }
+ // check if spilling is triggered or not
+ if (inMemSorter.hasSpaceForAnotherRecord()) {
+ releaseMemory(needed);
+ } else {
+ try {
+ inMemSorter.expandPointerArray();
+ releaseMemory(used);
+ } catch (OutOfMemoryError oom) {
+ // Just in case that JVM had run out of memory
+ releaseMemory(needed);
+ spill();
+ }
+ }
}
}
@@ -304,101 +318,38 @@ public final class UnsafeExternalSorter {
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the memory manager and spill if the requested memory can not be obtained.
*
- * @param requiredSpace the required space in the data page, in bytes, including space for storing
+ * @param required the required space in the data page, in bytes, including space for storing
* the record size. This must be less than or equal to the page size (records
* that exceed the page size are handled via a different code path which uses
* special overflow pages).
*/
- private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
- assert (requiredSpace <= pageSizeBytes);
- if (requiredSpace > freeSpaceInCurrentPage) {
- logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
- freeSpaceInCurrentPage);
- // TODO: we should track metrics on the amount of space wasted when we roll over to a new page
- // without using the free space at the end of the current page. We should also do this for
- // BytesToBytesMap.
- if (requiredSpace > pageSizeBytes) {
- throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
- pageSizeBytes + ")");
- } else {
- acquireNewPage();
- }
+ private void acquireNewPageIfNecessary(int required) {
+ if (currentPage == null ||
+ pageCursor + required > currentPage.getBaseOffset() + currentPage.size()) {
+ // TODO: try to find space on previous pages
+ currentPage = allocatePage(required);
+ pageCursor = currentPage.getBaseOffset();
+ allocatedPages.add(currentPage);
}
}
/**
- * Acquire a new page from the memory manager.
- *
- * If there is not enough space to allocate the new page, spill all existing ones
- * and try again. If there is still not enough space, report error to the caller.
- */
- private void acquireNewPage() throws IOException {
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (currentPage == null) {
- spill();
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (currentPage == null) {
- throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
- }
- }
- currentPagePosition = currentPage.getBaseOffset();
- freeSpaceInCurrentPage = pageSizeBytes;
- allocatedPages.add(currentPage);
- }
-
- /**
* Write a record to the sorter.
*/
- public void insertRecord(
- Object recordBaseObject,
- long recordBaseOffset,
- int lengthInBytes,
- long prefix) throws IOException {
+ public void insertRecord(Object recordBase, long recordOffset, int length, long prefix)
+ throws IOException {
growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
- final int totalSpaceRequired = lengthInBytes + 4;
-
- // --- Figure out where to insert the new record ----------------------------------------------
-
- final MemoryBlock dataPage;
- long dataPagePosition;
- boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
- if (useOverflowPage) {
- long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
- // The record is larger than the page size, so allocate a special overflow page just to hold
- // that record.
- MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- spill();
- overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
- }
- }
- allocatedPages.add(overflowPage);
- dataPage = overflowPage;
- dataPagePosition = overflowPage.getBaseOffset();
- } else {
- // The record is small enough to fit in a regular data page, but the current page might not
- // have enough space to hold it (or no pages have been allocated yet).
- acquireNewPageIfNecessary(totalSpaceRequired);
- dataPage = currentPage;
- dataPagePosition = currentPagePosition;
- // Update bookkeeping information
- freeSpaceInCurrentPage -= totalSpaceRequired;
- currentPagePosition += totalSpaceRequired;
- }
- final Object dataPageBaseObject = dataPage.getBaseObject();
-
- // --- Insert the record ----------------------------------------------------------------------
-
- final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
- Platform.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
- dataPagePosition += 4;
- Platform.copyMemory(
- recordBaseObject, recordBaseOffset, dataPageBaseObject, dataPagePosition, lengthInBytes);
+ final int required = length + 4;
+ acquireNewPageIfNecessary(required);
+
+ final Object base = currentPage.getBaseObject();
+ final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
+ Platform.putInt(base, pageCursor, length);
+ pageCursor += 4;
+ Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
+ pageCursor += length;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
}
@@ -411,59 +362,24 @@ public final class UnsafeExternalSorter {
*
* record length = key length + value length + 4
*/
- public void insertKVRecord(
- Object keyBaseObj, long keyOffset, int keyLen,
- Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException {
+ public void insertKVRecord(Object keyBase, long keyOffset, int keyLen,
+ Object valueBase, long valueOffset, int valueLen, long prefix)
+ throws IOException {
growPointerArrayIfNecessary();
- final int totalSpaceRequired = keyLen + valueLen + 4 + 4;
-
- // --- Figure out where to insert the new record ----------------------------------------------
-
- final MemoryBlock dataPage;
- long dataPagePosition;
- boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
- if (useOverflowPage) {
- long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
- // The record is larger than the page size, so allocate a special overflow page just to hold
- // that record.
- MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- spill();
- overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
- }
- }
- allocatedPages.add(overflowPage);
- dataPage = overflowPage;
- dataPagePosition = overflowPage.getBaseOffset();
- } else {
- // The record is small enough to fit in a regular data page, but the current page might not
- // have enough space to hold it (or no pages have been allocated yet).
- acquireNewPageIfNecessary(totalSpaceRequired);
- dataPage = currentPage;
- dataPagePosition = currentPagePosition;
- // Update bookkeeping information
- freeSpaceInCurrentPage -= totalSpaceRequired;
- currentPagePosition += totalSpaceRequired;
- }
- final Object dataPageBaseObject = dataPage.getBaseObject();
-
- // --- Insert the record ----------------------------------------------------------------------
-
- final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
- Platform.putInt(dataPageBaseObject, dataPagePosition, keyLen + valueLen + 4);
- dataPagePosition += 4;
-
- Platform.putInt(dataPageBaseObject, dataPagePosition, keyLen);
- dataPagePosition += 4;
-
- Platform.copyMemory(keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen);
- dataPagePosition += keyLen;
-
- Platform.copyMemory(valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, valueLen);
+ final int required = keyLen + valueLen + 4 + 4;
+ acquireNewPageIfNecessary(required);
+
+ final Object base = currentPage.getBaseObject();
+ final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
+ Platform.putInt(base, pageCursor, keyLen + valueLen + 4);
+ pageCursor += 4;
+ Platform.putInt(base, pageCursor, keyLen);
+ pageCursor += 4;
+ Platform.copyMemory(keyBase, keyOffset, base, pageCursor, keyLen);
+ pageCursor += keyLen;
+ Platform.copyMemory(valueBase, valueOffset, base, pageCursor, valueLen);
+ pageCursor += valueLen;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
@@ -475,10 +391,10 @@ public final class UnsafeExternalSorter {
*/
public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(inMemSorter != null);
- final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator();
- int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
+ readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
+ int numIteratorsToMerge = spillWriters.size() + (readingIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
- return inMemoryIterator;
+ return readingIterator;
} else {
final UnsafeSorterSpillMerger spillMerger =
new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge);
@@ -486,9 +402,113 @@ public final class UnsafeExternalSorter {
spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager));
}
spillWriters.clear();
- spillMerger.addSpillIfNotEmpty(inMemoryIterator);
+ spillMerger.addSpillIfNotEmpty(readingIterator);
return spillMerger.getSortedIterator();
}
}
+
+ /**
+ * An UnsafeSorterIterator that support spilling.
+ */
+ class SpillableIterator extends UnsafeSorterIterator {
+ private UnsafeSorterIterator upstream;
+ private UnsafeSorterIterator nextUpstream = null;
+ private MemoryBlock lastPage = null;
+ private boolean loaded = false;
+ private int numRecords = 0;
+
+ public SpillableIterator(UnsafeInMemorySorter.SortedIterator inMemIterator) {
+ this.upstream = inMemIterator;
+ this.numRecords = inMemIterator.numRecordsLeft();
+ }
+
+ public long spill() throws IOException {
+ synchronized (this) {
+ if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null
+ && numRecords > 0)) {
+ return 0L;
+ }
+
+ UnsafeInMemorySorter.SortedIterator inMemIterator =
+ ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+
+ final UnsafeSorterSpillWriter spillWriter =
+ new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
+ while (inMemIterator.hasNext()) {
+ inMemIterator.loadNext();
+ final Object baseObject = inMemIterator.getBaseObject();
+ final long baseOffset = inMemIterator.getBaseOffset();
+ final int recordLength = inMemIterator.getRecordLength();
+ spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
+ }
+ spillWriter.close();
+ spillWriters.add(spillWriter);
+ nextUpstream = spillWriter.getReader(blockManager);
+
+ long released = 0L;
+ synchronized (UnsafeExternalSorter.this) {
+ // release the pages except the one that is used
+ for (MemoryBlock page : allocatedPages) {
+ if (!loaded || page.getBaseObject() != inMemIterator.getBaseObject()) {
+ released += page.size();
+ freePage(page);
+ } else {
+ lastPage = page;
+ }
+ }
+ allocatedPages.clear();
+ }
+ return released;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return numRecords > 0;
+ }
+
+ @Override
+ public void loadNext() throws IOException {
+ synchronized (this) {
+ loaded = true;
+ if (nextUpstream != null) {
+ // Just consumed the last record from in memory iterator
+ if (lastPage != null) {
+ freePage(lastPage);
+ lastPage = null;
+ }
+ upstream = nextUpstream;
+ nextUpstream = null;
+
+ assert(inMemSorter != null);
+ long used = inMemSorter.getMemoryUsage();
+ inMemSorter = null;
+ releaseMemory(used);
+ }
+ numRecords--;
+ upstream.loadNext();
+ }
+ }
+
+ @Override
+ public Object getBaseObject() {
+ return upstream.getBaseObject();
+ }
+
+ @Override
+ public long getBaseOffset() {
+ return upstream.getBaseOffset();
+ }
+
+ @Override
+ public int getRecordLength() {
+ return upstream.getRecordLength();
+ }
+
+ @Override
+ public long getKeyPrefix() {
+ return upstream.getKeyPrefix();
+ }
+ }
}