aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala25
1 files changed, 19 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 1e4531ef39..d166037351 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.io.ByteStreams
-import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.{Logging, SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager}
@@ -470,14 +470,27 @@ class ExternalAppendOnlyMap[K, V, C](
item
}
- // TODO: Ensure this gets called even if the iterator isn't drained.
private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
- deserializeStream = null
- fileStream = null
- ds.close()
- file.delete()
+ if (ds != null) {
+ ds.close()
+ deserializeStream = null
+ }
+ if (fileStream != null) {
+ fileStream.close()
+ fileStream = null
+ }
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+
+ val context = TaskContext.get()
+ // context is null in some tests of ExternalAppendOnlyMapSuite because these tests don't run in
+ // a TaskContext.
+ if (context != null) {
+ context.addTaskCompletionListener(context => cleanup())
}
}