aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-04-12 16:10:07 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-12 16:10:07 -0700
commitd187e7dea9540d26b7800de4eb79863ef5f574bf (patch)
tree3a01d8f91d0a7a0573bd5eccb93abd844a75e43a
parentc439d88e99c35a5f29f071715addfee8cbb215dc (diff)
downloadspark-d187e7dea9540d26b7800de4eb79863ef5f574bf.tar.gz
spark-d187e7dea9540d26b7800de4eb79863ef5f574bf.tar.bz2
spark-d187e7dea9540d26b7800de4eb79863ef5f574bf.zip
[SPARK-14363] Fix executor OOM due to memory leak in the Sorter
## What changes were proposed in this pull request? Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR https://github.com/apache/spark/pull/9241 ## How was this patch tested? Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <skedia@fb.com> Closes #12285 from sitalkedia/executor_oom.
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java6
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java7
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java7
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java7
4 files changed, 23 insertions, 4 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 81ee7ab58a..3c2980e442 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -215,8 +215,6 @@ final class ShuffleExternalSorter extends MemoryConsumer {
}
}
- inMemSorter.reset();
-
if (!isLastFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
@@ -255,6 +253,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
writeSortedFile(false);
final long spillSize = freeMemory();
+ inMemSorter.reset();
+ // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
+ // records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
+ // we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index fe79ff0e30..76b0e6a304 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -51,9 +51,12 @@ final class ShuffleInMemorySorter {
*/
private int pos = 0;
+ private int initialSize;
+
ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
this.consumer = consumer;
assert (initialSize > 0);
+ this.initialSize = initialSize;
this.array = consumer.allocateArray(initialSize);
this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
}
@@ -70,6 +73,10 @@ final class ShuffleInMemorySorter {
}
public void reset() {
+ if (consumer != null) {
+ consumer.freeArray(array);
+ this.array = consumer.allocateArray(initialSize);
+ }
pos = 0;
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ded8f0472b..ef79b49083 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -200,14 +200,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
}
spillWriter.close();
-
- inMemSorter.reset();
}
final long spillSize = freeMemory();
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
+ inMemSorter.reset();
+ // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
+ // records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
+ // we might not be able to get memory for the pointer array.
+
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 145c3a1950..01eae0e8dc 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -84,6 +84,8 @@ public final class UnsafeInMemorySorter {
*/
private int pos = 0;
+ private long initialSize;
+
public UnsafeInMemorySorter(
final MemoryConsumer consumer,
final TaskMemoryManager memoryManager,
@@ -102,6 +104,7 @@ public final class UnsafeInMemorySorter {
LongArray array) {
this.consumer = consumer;
this.memoryManager = memoryManager;
+ this.initialSize = array.size();
if (recordComparator != null) {
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
@@ -123,6 +126,10 @@ public final class UnsafeInMemorySorter {
}
public void reset() {
+ if (consumer != null) {
+ consumer.freeArray(array);
+ this.array = consumer.allocateArray(initialSize);
+ }
pos = 0;
}