aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-21 09:55:42 -0700
committerAndrew Or <andrew@databricks.com>2015-07-21 09:55:42 -0700
commitd45355ee224b734727255ff278a47801f5da7e93 (patch)
tree858a07093d50238d15457346aca8caeb8f37d8b5 /core
parent4f7f1ee378e80b33686508d56e133fc25dec5316 (diff)
downloadspark-d45355ee224b734727255ff278a47801f5da7e93.tar.gz
spark-d45355ee224b734727255ff278a47801f5da7e93.tar.bz2
spark-d45355ee224b734727255ff278a47801f5da7e93.zip
[SPARK-5423] [CORE] Register a TaskCompletionListener to make sure release all resources
Make `DiskMapIterator.cleanup` idempotent and register a TaskCompletionListener to make sure call `cleanup`. Author: zsxwing <zsxwing@gmail.com> Closes #7529 from zsxwing/SPARK-5423 and squashes the following commits: 3e3c413 [zsxwing] Remove TODO 9556c78 [zsxwing] Fix NullPointerException for tests 3d574d9 [zsxwing] Register a TaskCompletionListener to make sure release all resources
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala25
1 files changed, 19 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 1e4531ef39..d166037351 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.io.ByteStreams
-import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.{Logging, SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager}
@@ -470,14 +470,27 @@ class ExternalAppendOnlyMap[K, V, C](
item
}
- // TODO: Ensure this gets called even if the iterator isn't drained.
private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
- deserializeStream = null
- fileStream = null
- ds.close()
- file.delete()
+ if (ds != null) {
+ ds.close()
+ deserializeStream = null
+ }
+ if (fileStream != null) {
+ fileStream.close()
+ fileStream = null
+ }
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+
+ val context = TaskContext.get()
+ // context is null in some tests of ExternalAppendOnlyMapSuite because these tests don't run in
+ // a TaskContext.
+ if (context != null) {
+ context.addTaskCompletionListener(context => cleanup())
}
}