diff options
author | zsxwing <zsxwing@gmail.com> | 2015-07-21 09:55:42 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-07-21 09:55:42 -0700 |
commit | d45355ee224b734727255ff278a47801f5da7e93 (patch) | |
tree | 858a07093d50238d15457346aca8caeb8f37d8b5 /core/src | |
parent | 4f7f1ee378e80b33686508d56e133fc25dec5316 (diff) | |
download | spark-d45355ee224b734727255ff278a47801f5da7e93.tar.gz spark-d45355ee224b734727255ff278a47801f5da7e93.tar.bz2 spark-d45355ee224b734727255ff278a47801f5da7e93.zip |
[SPARK-5423] [CORE] Register a TaskCompletionListener to make sure release all resources
Make `DiskMapIterator.cleanup` idempotent and register a TaskCompletionListener to make sure call `cleanup`.
Author: zsxwing <zsxwing@gmail.com>
Closes #7529 from zsxwing/SPARK-5423 and squashes the following commits:
3e3c413 [zsxwing] Remove TODO
9556c78 [zsxwing] Fix NullPointerException for tests
3d574d9 [zsxwing] Register a TaskCompletionListener to make sure release all resources
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 1e4531ef39..d166037351 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.io.ByteStreams -import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.{Logging, SparkEnv, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.serializer.{DeserializationStream, Serializer} import org.apache.spark.storage.{BlockId, BlockManager} @@ -470,14 +470,27 @@ class ExternalAppendOnlyMap[K, V, C]( item } - // TODO: Ensure this gets called even if the iterator isn't drained. private def cleanup() { batchIndex = batchOffsets.length // Prevent reading any other batch val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - file.delete() + if (ds != null) { + ds.close() + deserializeStream = null + } + if (fileStream != null) { + fileStream.close() + fileStream = null + } + if (file.exists()) { + file.delete() + } + } + + val context = TaskContext.get() + // context is null in some tests of ExternalAppendOnlyMapSuite because these tests don't run in + // a TaskContext. + if (context != null) { + context.addTaskCompletionListener(context => cleanup()) } } |