aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java6
1 files changed, 4 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 81ee7ab58a..3c2980e442 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -215,8 +215,6 @@ final class ShuffleExternalSorter extends MemoryConsumer {
}
}
- inMemSorter.reset();
-
if (!isLastFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
@@ -255,6 +253,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
writeSortedFile(false);
final long spillSize = freeMemory();
+ inMemSorter.reset();
+ // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
+ // records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
+ // we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
}