aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-06-30 10:53:18 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-30 10:53:18 -0700
commit07f46afc733b1718d528a6ea5c0d774f047024fa (patch)
treed150b6afc44dfdc05e57ebaba4136b0c0e4cf063 /core
parent5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98 (diff)
downloadspark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.gz
spark-07f46afc733b1718d528a6ea5c0d774f047024fa.tar.bz2
spark-07f46afc733b1718d528a6ea5c0d774f047024fa.zip
[SPARK-13850] Force the sorter to Spill when number of elements in th…
## What changes were proposed in this pull request? Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. ## How was this patch tested? Tested by running a job which was failing without this change due to TimSort bug. Author: Sital Kedia <skedia@fb.com> Closes #13107 from sitalkedia/fix_TimSort.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java10
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java23
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java3
3 files changed, 30 insertions, 6 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 014aef86b5..696ee73a76 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
private final TaskContext taskContext;
private final ShuffleWriteMetrics writeMetrics;
- /** Force this sorter to spill when there are this many elements in memory. For testing only */
+ /**
+ * Force this sorter to spill when there are this many elements in memory. The default value is
+ * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G.
+ */
private final long numElementsForSpillThreshold;
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
@@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.numElementsForSpillThreshold =
- conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE);
+ conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
@@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
// for tests
assert(inMemSorter != null);
- if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
+ if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+ logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
spill();
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ec15f0b59d..d6a255ed9d 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
@@ -60,6 +61,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final int fileBufferSizeBytes;
/**
+ * Force this sorter to spill when there are this many elements in memory. The default value is
+ * 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array to be 8G.
+ */
+ public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 1024 * 1024 / 2;
+
+ private final long numElementsForSpillThreshold;
+ /**
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
* this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
@@ -88,9 +96,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
+ long numElementsForSpillThreshold,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
- serializerManager, taskContext, recordComparator, prefixComparator, initialSize,
+ serializerManager, taskContext, recordComparator, prefixComparator, initialSize, numElementsForSpillThreshold,
pageSizeBytes, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
// The external sorter will be used to insert records, in-memory sorter is not needed.
@@ -107,9 +116,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
+ long numElementsForSpillThreshold,
boolean canUseRadixSort) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
- taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null,
+ taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, null,
canUseRadixSort);
}
@@ -122,6 +132,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
+ long numElementsForSpillThreshold,
@Nullable UnsafeInMemorySorter existingInMemorySorter,
boolean canUseRadixSort) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
@@ -143,6 +154,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
this.inMemSorter = existingInMemorySorter;
}
this.peakMemoryUsedBytes = getMemoryUsage();
+ this.numElementsForSpillThreshold = numElementsForSpillThreshold;
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
@@ -373,6 +385,12 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
Object recordBase, long recordOffset, int length, long prefix, boolean prefixIsNull)
throws IOException {
+ assert(inMemSorter != null);
+ if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+ logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
+ spill();
+ }
+
growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int required = length + 4;
@@ -384,7 +402,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
pageCursor += 4;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
- assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index bce958c3dc..3ea99233fe 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -176,6 +176,7 @@ public class UnsafeExternalSorterSuite {
prefixComparator,
/* initialSize */ 1024,
pageSizeBytes,
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());
}
@@ -399,6 +400,7 @@ public class UnsafeExternalSorterSuite {
null,
/* initialSize */ 1024,
pageSizeBytes,
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());
long[] record = new long[100];
int recordSize = record.length * 8;
@@ -435,6 +437,7 @@ public class UnsafeExternalSorterSuite {
prefixComparator,
1024,
pageSizeBytes,
+ UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
shouldUseRadixSort());
// Peak memory should be monotonically increasing. More specifically, every time