diff options
Diffstat (limited to 'core')
3 files changed, 70 insertions, 30 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 07241c827c..6656fd1d0b 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -20,6 +20,7 @@ package org.apache.spark.unsafe.map; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; @@ -638,7 +639,11 @@ public final class BytesToBytesMap extends MemoryConsumer { assert (valueLength % 8 == 0); assert(longArray != null); - if (numElements == MAX_CAPACITY || !canGrowArray) { + + if (numElements == MAX_CAPACITY + // The map could be reused from last spill (because of no enough memory to grow), + // then we don't try to grow again if hit the `growthThreshold`. + || !canGrowArray && numElements > growthThreshold) { return false; } @@ -730,25 +735,18 @@ public final class BytesToBytesMap extends MemoryConsumer { } /** - * Free the memory used by longArray. + * Free all allocated memory associated with this map, including the storage for keys and values + * as well as the hash map array itself. + * + * This method is idempotent and can be called multiple times. */ - public void freeArray() { + public void free() { updatePeakMemoryUsed(); if (longArray != null) { long used = longArray.memoryBlock().size(); longArray = null; releaseMemory(used); } - } - - /** - * Free all allocated memory associated with this map, including the storage for keys and values - * as well as the hash map array itself. - * - * This method is idempotent and can be called multiple times. - */ - public void free() { - freeArray(); Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator(); while (dataPagesIterator.hasNext()) { MemoryBlock dataPage = dataPagesIterator.next(); @@ -834,6 +832,28 @@ public final class BytesToBytesMap extends MemoryConsumer { } /** + * Returns the underline long[] of longArray. + */ + public long[] getArray() { + assert(longArray != null); + return (long[]) longArray.memoryBlock().getBaseObject(); + } + + /** + * Reset this map to initialized state. + */ + public void reset() { + numElements = 0; + Arrays.fill(getArray(), 0); + while (dataPages.size() > 0) { + MemoryBlock dataPage = dataPages.removeLast(); + freePage(dataPage); + } + currentPage = null; + pageCursor = 0; + } + + /** * Grows the size of the hash table and re-hash everything. */ @VisibleForTesting diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 509fb0a044..cba043bc48 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -79,9 +79,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, - UnsafeInMemorySorter inMemorySorter) { - return new UnsafeExternalSorter(taskMemoryManager, blockManager, + UnsafeInMemorySorter inMemorySorter) throws IOException { + UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager, taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter); + sorter.spill(Long.MAX_VALUE, sorter); + // The external sorter will be used to insert records, in-memory sorter is not needed. + sorter.inMemSorter = null; + return sorter; } public static UnsafeExternalSorter create( @@ -124,7 +128,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer { acquireMemory(inMemSorter.getMemoryUsage()); } else { this.inMemSorter = existingInMemorySorter; - // will acquire after free the map } this.peakMemoryUsedBytes = getMemoryUsage(); @@ -157,12 +160,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer { */ @Override public long spill(long size, MemoryConsumer trigger) throws IOException { - assert(inMemSorter != null); if (trigger != this) { if (readingIterator != null) { return readingIterator.spill(); - } else { - } return 0L; // this should throw exception } @@ -389,24 +389,37 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } /** + * Merges another UnsafeExternalSorters into this one, the other one will be emptied. + * + * @throws IOException + */ + public void merge(UnsafeExternalSorter other) throws IOException { + other.spill(); + spillWriters.addAll(other.spillWriters); + // remove them from `spillWriters`, or the files will be deleted in `cleanupResources`. + other.spillWriters.clear(); + other.cleanupResources(); + } + + /** * Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()` * after consuming this iterator. */ public UnsafeSorterIterator getSortedIterator() throws IOException { - assert(inMemSorter != null); - readingIterator = new SpillableIterator(inMemSorter.getSortedIterator()); - int numIteratorsToMerge = spillWriters.size() + (readingIterator.hasNext() ? 1 : 0); if (spillWriters.isEmpty()) { + assert(inMemSorter != null); + readingIterator = new SpillableIterator(inMemSorter.getSortedIterator()); return readingIterator; } else { final UnsafeSorterSpillMerger spillMerger = - new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge); + new UnsafeSorterSpillMerger(recordComparator, prefixComparator, spillWriters.size()); for (UnsafeSorterSpillWriter spillWriter : spillWriters) { spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager)); } - spillWriters.clear(); - spillMerger.addSpillIfNotEmpty(readingIterator); - + if (inMemSorter != null) { + readingIterator = new SpillableIterator(inMemSorter.getSortedIterator()); + spillMerger.addSpillIfNotEmpty(readingIterator); + } return spillMerger.getSortedIterator(); } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 1480f0681e..d57213b9b8 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -19,9 +19,9 @@ package org.apache.spark.util.collection.unsafe.sort; import java.util.Comparator; +import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.collection.Sorter; -import org.apache.spark.memory.TaskMemoryManager; /** * Sorts records using an AlphaSort-style key-prefix sort. This sort stores pointers to records @@ -78,12 +78,19 @@ public final class UnsafeInMemorySorter { private int pos = 0; public UnsafeInMemorySorter( + final TaskMemoryManager memoryManager, + final RecordComparator recordComparator, + final PrefixComparator prefixComparator, + int initialSize) { + this(memoryManager, recordComparator, prefixComparator, new long[initialSize * 2]); + } + + public UnsafeInMemorySorter( final TaskMemoryManager memoryManager, final RecordComparator recordComparator, final PrefixComparator prefixComparator, - int initialSize) { - assert (initialSize > 0); - this.array = new long[initialSize * 2]; + long[] array) { + this.array = array; this.memoryManager = memoryManager; this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); |