aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java14
1 files changed, 14 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index c21990f4e4..866e0b4151 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -20,6 +20,9 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.io.IOException;
import java.util.LinkedList;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +93,17 @@ public final class UnsafeExternalSorter {
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();
+
+ // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
+ // the end of the task. This is necessary to avoid memory leaks in when the downstream operator
+ // does not fully consume the sorter's output (e.g. sort followed by limit).
+ taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ freeMemory();
+ return null;
+ }
+ });
}
// TODO: metrics tracking + integration with shuffle write metrics