diff options
Diffstat (limited to 'core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java')
-rw-r--r-- | core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 81ee7ab58a..3c2980e442 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -215,8 +215,6 @@ final class ShuffleExternalSorter extends MemoryConsumer { } } - inMemSorter.reset(); - if (!isLastFile) { // i.e. this is a spill file // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter @@ -255,6 +253,10 @@ final class ShuffleExternalSorter extends MemoryConsumer { writeSortedFile(false); final long spillSize = freeMemory(); + inMemSorter.reset(); + // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the + // records. Otherwise, if the task is over allocated memory, then without freeing the memory pages, + // we might not be able to get memory for the pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); return spillSize; } |