From d45355ee224b734727255ff278a47801f5da7e93 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 21 Jul 2015 09:55:42 -0700 Subject: [SPARK-5423] [CORE] Register a TaskCompletionListener to make sure release all resources Make `DiskMapIterator.cleanup` idempotent and register a TaskCompletionListener to make sure call `cleanup`. Author: zsxwing Closes #7529 from zsxwing/SPARK-5423 and squashes the following commits: 3e3c413 [zsxwing] Remove TODO 9556c78 [zsxwing] Fix NullPointerException for tests 3d574d9 [zsxwing] Register a TaskCompletionListener to make sure release all resources --- .../util/collection/ExternalAppendOnlyMap.scala | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 1e4531ef39..d166037351 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.io.ByteStreams -import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.{Logging, SparkEnv, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.serializer.{DeserializationStream, Serializer} import org.apache.spark.storage.{BlockId, BlockManager} @@ -470,14 +470,27 @@ class ExternalAppendOnlyMap[K, V, C]( item } - // TODO: Ensure this gets called even if the iterator isn't drained. private def cleanup() { batchIndex = batchOffsets.length // Prevent reading any other batch val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - file.delete() + if (ds != null) { + ds.close() + deserializeStream = null + } + if (fileStream != null) { + fileStream.close() + fileStream = null + } + if (file.exists()) { + file.delete() + } + } + + val context = TaskContext.get() + // context is null in some tests of ExternalAppendOnlyMapSuite because these tests don't run in + // a TaskContext. + if (context != null) { + context.addTaskCompletionListener(context => cleanup()) } } -- cgit v1.2.3