diff options
author | Sital Kedia <skedia@fb.com> | 2016-06-30 10:53:18 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-06-30 10:53:18 -0700 |
commit | 07f46afc733b1718d528a6ea5c0d774f047024fa (patch) | |
tree | d150b6afc44dfdc05e57ebaba4136b0c0e4cf063 /core | |
parent | 5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98 (diff) | |
download | spark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.gz spark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.bz2 spark-07f46afc733b1718d528a6ea5c0d774f047024fa.zip |
[SPARK-13850] Force the sorter to Spill when number of elements in th…
## What changes were proposed in this pull request?
Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size.
## How was this patch tested?
Tested by running a job which was failing without this change due to TimSort bug.
Author: Sital Kedia <skedia@fb.com>
Closes #13107 from sitalkedia/fix_TimSort.
Diffstat (limited to 'core')
3 files changed, 30 insertions, 6 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 014aef86b5..696ee73a76 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer { private final TaskContext taskContext; private final ShuffleWriteMetrics writeMetrics; - /** Force this sorter to spill when there are this many elements in memory. For testing only */ + /** + * Force this sorter to spill when there are this many elements in memory. The default value is + * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G. + */ private final long numElementsForSpillThreshold; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ @@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.numElementsForSpillThreshold = - conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE); + conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); @@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer { // for tests assert(inMemSorter != null); - if (inMemSorter.numRecords() > numElementsForSpillThreshold) { + if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { + logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index ec15f0b59d..d6a255ed9d 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; @@ -60,6 +61,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final int fileBufferSizeBytes; /** + * Force this sorter to spill when there are this many elements in memory. The default value is + * 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array to be 8G. + */ + public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 1024 * 1024 / 2; + + private final long numElementsForSpillThreshold; + /** * Memory pages that hold the records being sorted. The pages in this list are freed when * spilling, although in principle we could recycle these pages across spills (on the other hand, * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager @@ -88,9 +96,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, + long numElementsForSpillThreshold, UnsafeInMemorySorter inMemorySorter) throws IOException { UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager, - serializerManager, taskContext, recordComparator, prefixComparator, initialSize, + serializerManager, taskContext, recordComparator, prefixComparator, initialSize, numElementsForSpillThreshold, pageSizeBytes, inMemorySorter, false /* ignored */); sorter.spill(Long.MAX_VALUE, sorter); // The external sorter will be used to insert records, in-memory sorter is not needed. @@ -107,9 +116,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, + long numElementsForSpillThreshold, boolean canUseRadixSort) { return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager, - taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null, + taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, null, canUseRadixSort); } @@ -122,6 +132,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, + long numElementsForSpillThreshold, @Nullable UnsafeInMemorySorter existingInMemorySorter, boolean canUseRadixSort) { super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode()); @@ -143,6 +154,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { this.inMemSorter = existingInMemorySorter; } this.peakMemoryUsedBytes = getMemoryUsage(); + this.numElementsForSpillThreshold = numElementsForSpillThreshold; // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator @@ -373,6 +385,12 @@ public final class UnsafeExternalSorter extends MemoryConsumer { Object recordBase, long recordOffset, int length, long prefix, boolean prefixIsNull) throws IOException { + assert(inMemSorter != null); + if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { + logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); + spill(); + } + growPointerArrayIfNecessary(); // Need 4 bytes to store the record length. final int required = length + 4; @@ -384,7 +402,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer { pageCursor += 4; Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); pageCursor += length; - assert(inMemSorter != null); inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull); } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index bce958c3dc..3ea99233fe 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -176,6 +176,7 @@ public class UnsafeExternalSorterSuite { prefixComparator, /* initialSize */ 1024, pageSizeBytes, + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD, shouldUseRadixSort()); } @@ -399,6 +400,7 @@ public class UnsafeExternalSorterSuite { null, /* initialSize */ 1024, pageSizeBytes, + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD, shouldUseRadixSort()); long[] record = new long[100]; int recordSize = record.length * 8; @@ -435,6 +437,7 @@ public class UnsafeExternalSorterSuite { prefixComparator, 1024, pageSizeBytes, + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD, shouldUseRadixSort()); // Peak memory should be monotonically increasing. More specifically, every time |