diff options
author | Davies Liu <davies@databricks.com> | 2016-06-03 16:45:09 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-06-03 16:45:09 -0700 |
commit | 3074f575a3c84108fddab3f5f56eb1929a4b2cff (patch) | |
tree | 381fdb85dd2b52bc79b0e18c27eded1bd66d622d /core/src/main | |
parent | 67cc89ff028324ba4a7a7d9c19a268b9afea0031 (diff) | |
download | spark-3074f575a3c84108fddab3f5f56eb1929a4b2cff.tar.gz spark-3074f575a3c84108fddab3f5f56eb1929a4b2cff.tar.bz2 spark-3074f575a3c84108fddab3f5f56eb1929a4b2cff.zip |
[SPARK-15391] [SQL] manage the temporary memory of timsort
## What changes were proposed in this pull request?
Currently, the memory for temporary buffer used by TimSort is always allocated as on-heap without bookkeeping, it could cause OOM both in on-heap and off-heap mode.
This PR will try to manage that by preallocate it together with the pointer array, same with RadixSort. It both works for on-heap and off-heap mode.
This PR also change the loadFactor of BytesToBytesMap to 0.5 (it was 0.70), it enables use to radix sort also makes sure that we have enough memory for timsort.
## How was this patch tested?
Existing tests.
Author: Davies Liu <davies@databricks.com>
Closes #13318 from davies/fix_timsort.
Diffstat (limited to 'core/src/main')
5 files changed, 73 insertions, 37 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index 75a0e807d7..dc36809d89 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -22,12 +22,12 @@ import java.util.Comparator; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.Sorter; import org.apache.spark.util.collection.unsafe.sort.RadixSort; final class ShuffleInMemorySorter { - private final Sorter<PackedRecordPointer, LongArray> sorter; private static final class SortComparator implements Comparator<PackedRecordPointer> { @Override public int compare(PackedRecordPointer left, PackedRecordPointer right) { @@ -44,6 +44,9 @@ final class ShuffleInMemorySorter { * An array of record pointers and partition ids that have been encoded by * {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating * records. + * + * Only part of the array will be used to store the pointers, the rest part is preserved as + * temporary buffer for sorting. */ private LongArray array; @@ -54,14 +57,14 @@ final class ShuffleInMemorySorter { private final boolean useRadixSort; /** - * Set to 2x for radix sort to reserve extra memory for sorting, otherwise 1x. + * The position in the pointer array where new records can be inserted. */ - private final int memoryAllocationFactor; + private int pos = 0; /** - * The position in the pointer array where new records can be inserted. + * How many records could be inserted, because part of the array should be left for sorting. */ - private int pos = 0; + private int usableCapacity = 0; private int initialSize; @@ -70,9 +73,14 @@ final class ShuffleInMemorySorter { assert (initialSize > 0); this.initialSize = initialSize; this.useRadixSort = useRadixSort; - this.memoryAllocationFactor = useRadixSort ? 2 : 1; this.array = consumer.allocateArray(initialSize); - this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE); + this.usableCapacity = getUsableCapacity(); + } + + private int getUsableCapacity() { + // Radix sort requires same amount of used memory as buffer, Tim sort requires + // half of the used memory as buffer. + return (int) (array.size() / (useRadixSort ? 2 : 1.5)); } public void free() { @@ -89,7 +97,8 @@ final class ShuffleInMemorySorter { public void reset() { if (consumer != null) { consumer.freeArray(array); - this.array = consumer.allocateArray(initialSize); + array = consumer.allocateArray(initialSize); + usableCapacity = getUsableCapacity(); } pos = 0; } @@ -101,14 +110,15 @@ final class ShuffleInMemorySorter { array.getBaseOffset(), newArray.getBaseObject(), newArray.getBaseOffset(), - array.size() * (8 / memoryAllocationFactor) + pos * 8L ); consumer.freeArray(array); array = newArray; + usableCapacity = getUsableCapacity(); } public boolean hasSpaceForAnotherRecord() { - return pos < array.size() / memoryAllocationFactor; + return pos < usableCapacity; } public long getMemoryUsage() { @@ -170,6 +180,14 @@ final class ShuffleInMemorySorter { PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX, PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false); } else { + MemoryBlock unused = new MemoryBlock( + array.getBaseObject(), + array.getBaseOffset() + pos * 8L, + (array.size() - pos) * 8L); + LongArray buffer = new LongArray(unused); + Sorter<PackedRecordPointer, LongArray> sorter = + new Sorter<>(new ShuffleSortDataFormat(buffer)); + sorter.sort(array, 0, pos, SORT_COMPARATOR); } return new ShuffleSorterIterator(pos, array, offset); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java index 1e924d2aec..717bdd79d4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java @@ -19,14 +19,15 @@ package org.apache.spark.shuffle.sort; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.SortDataFormat; final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, LongArray> { - public static final ShuffleSortDataFormat INSTANCE = new ShuffleSortDataFormat(); + private final LongArray buffer; - private ShuffleSortDataFormat() { } + ShuffleSortDataFormat(LongArray buffer) { + this.buffer = buffer; + } @Override public PackedRecordPointer getKey(LongArray data, int pos) { @@ -70,8 +71,8 @@ final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, Lo @Override public LongArray allocate(int length) { - // This buffer is used temporary (usually small), so it's fine to allocated from JVM heap. - return new LongArray(MemoryBlock.fromLongArray(new long[length])); + assert (length <= buffer.size()) : + "the buffer is smaller than required: " + buffer.size() + " < " + length; + return buffer; } - } diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6c00608302..dc04025692 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -221,7 +221,8 @@ public final class BytesToBytesMap extends MemoryConsumer { SparkEnv.get() != null ? SparkEnv.get().blockManager() : null, SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null, initialCapacity, - 0.70, + // In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5. + 0.5, pageSizeBytes, enablePerfMetrics); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 0cce792f33..c7b070f519 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -25,6 +25,7 @@ import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.Sorter; /** @@ -69,8 +70,6 @@ public final class UnsafeInMemorySorter { private final MemoryConsumer consumer; private final TaskMemoryManager memoryManager; @Nullable - private final Sorter<RecordPointerAndKeyPrefix, LongArray> sorter; - @Nullable private final Comparator<RecordPointerAndKeyPrefix> sortComparator; /** @@ -80,13 +79,11 @@ public final class UnsafeInMemorySorter { private final PrefixComparators.RadixSortSupport radixSortSupport; /** - * Set to 2x for radix sort to reserve extra memory for sorting, otherwise 1x. - */ - private final int memoryAllocationFactor; - - /** * Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. + * + * Only part of the array will be used to store the pointers, the rest part is preserved as + * temporary buffer for sorting. */ private LongArray array; @@ -95,6 +92,11 @@ public final class UnsafeInMemorySorter { */ private int pos = 0; + /** + * How many records could be inserted, because part of the array should be left for sorting. + */ + private int usableCapacity = 0; + private long initialSize; private long totalSortTimeNanos = 0L; @@ -121,7 +123,6 @@ public final class UnsafeInMemorySorter { this.memoryManager = memoryManager; this.initialSize = array.size(); if (recordComparator != null) { - this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); if (canUseRadixSort && prefixComparator instanceof PrefixComparators.RadixSortSupport) { this.radixSortSupport = (PrefixComparators.RadixSortSupport)prefixComparator; @@ -129,12 +130,17 @@ public final class UnsafeInMemorySorter { this.radixSortSupport = null; } } else { - this.sorter = null; this.sortComparator = null; this.radixSortSupport = null; } - this.memoryAllocationFactor = this.radixSortSupport != null ? 2 : 1; this.array = array; + this.usableCapacity = getUsableCapacity(); + } + + private int getUsableCapacity() { + // Radix sort requires same amount of used memory as buffer, Tim sort requires + // half of the used memory as buffer. + return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5)); } /** @@ -150,7 +156,8 @@ public final class UnsafeInMemorySorter { public void reset() { if (consumer != null) { consumer.freeArray(array); - this.array = consumer.allocateArray(initialSize); + array = consumer.allocateArray(initialSize); + usableCapacity = getUsableCapacity(); } pos = 0; } @@ -174,7 +181,7 @@ public final class UnsafeInMemorySorter { } public boolean hasSpaceForAnotherRecord() { - return pos + 1 < (array.size() / memoryAllocationFactor); + return pos + 1 < usableCapacity; } public void expandPointerArray(LongArray newArray) { @@ -186,9 +193,10 @@ public final class UnsafeInMemorySorter { array.getBaseOffset(), newArray.getBaseObject(), newArray.getBaseOffset(), - array.size() * (8 / memoryAllocationFactor)); + pos * 8L); consumer.freeArray(array); array = newArray; + usableCapacity = getUsableCapacity(); } /** @@ -275,13 +283,20 @@ public final class UnsafeInMemorySorter { public SortedIterator getSortedIterator() { int offset = 0; long start = System.nanoTime(); - if (sorter != null) { + if (sortComparator != null) { if (this.radixSortSupport != null) { // TODO(ekl) we should handle NULL values before radix sort for efficiency, since they // force a full-width sort (and we cannot radix-sort nullable long fields at all). offset = RadixSort.sortKeyPrefixArray( array, pos / 2, 0, 7, radixSortSupport.sortDescending(), radixSortSupport.sortSigned()); } else { + MemoryBlock unused = new MemoryBlock( + array.getBaseObject(), + array.getBaseOffset() + pos * 8L, + (array.size() - pos) * 8L); + LongArray buffer = new LongArray(unused); + Sorter<RecordPointerAndKeyPrefix, LongArray> sorter = + new Sorter<>(new UnsafeSortDataFormat(buffer)); sorter.sort(array, 0, pos / 2, sortComparator); } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index 7bda76907f..430bf677ed 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -19,7 +19,6 @@ package org.apache.spark.util.collection.unsafe.sort; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.SortDataFormat; /** @@ -32,9 +31,11 @@ import org.apache.spark.util.collection.SortDataFormat; public final class UnsafeSortDataFormat extends SortDataFormat<RecordPointerAndKeyPrefix, LongArray> { - public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat(); + private final LongArray buffer; - private UnsafeSortDataFormat() { } + public UnsafeSortDataFormat(LongArray buffer) { + this.buffer = buffer; + } @Override public RecordPointerAndKeyPrefix getKey(LongArray data, int pos) { @@ -83,9 +84,9 @@ public final class UnsafeSortDataFormat @Override public LongArray allocate(int length) { - assert (length < Integer.MAX_VALUE / 2) : "Length " + length + " is too large"; - // This is used as temporary buffer, it's fine to allocate from JVM heap. - return new LongArray(MemoryBlock.fromLongArray(new long[length * 2])); + assert (length * 2 <= buffer.size()) : + "the buffer is smaller than required: " + buffer.size() + " < " + (length * 2); + return buffer; } } |