aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-02-19 18:37:31 +0000
committerUbuntu <ubuntu@ip-172-31-36-14.us-west-2.compute.internal>2015-02-19 18:37:31 +0000
commit90095bf3ce9304d09a32ceffaa99069079071b59 (patch)
tree0c25cd42aa249ae31e3e157a2c6041c4edf4dae2 /core
parent38e624a732b18e01ad2e7a499ce0bb0d7acdcdf6 (diff)
downloadspark-90095bf3ce9304d09a32ceffaa99069079071b59.tar.gz
spark-90095bf3ce9304d09a32ceffaa99069079071b59.tar.bz2
spark-90095bf3ce9304d09a32ceffaa99069079071b59.zip
[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data. Author: zsxwing <zsxwing@gmail.com> Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits: d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala52
1 files changed, 43 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8a0f5a602d..fc7e86e297 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null
+ @volatile private var closed = false
+
+ // A volatile variable to remember which DeserializationStream is using. Need to set it when we
+ // open a DeserializationStream. But we should use `deserializeStream` rather than
+ // `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
+ // reduce the performance. It must be volatile so that we can see its correct value in the
+ // `finalize` method, which could run in any thread.
+ @volatile private var deserializeStreamToBeClosed: DeserializationStream = null
+
// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
+ deserializeStreamToBeClosed = null
deserializeStream.close()
fileStream.close()
deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
- ser.deserializeStream(compressedStream)
+ // Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
+ // close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
+ // during reading the (K, C) pairs.
+ deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
+ deserializeStreamToBeClosed
} else {
// No more batches left
cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
item
}
- // TODO: Ensure this gets called even if the iterator isn't drained.
- private def cleanup() {
- batchIndex = batchOffsets.length // Prevent reading any other batch
- val ds = deserializeStream
- deserializeStream = null
- fileStream = null
- ds.close()
- file.delete()
+ // TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
+ // future, we need some mechanism to ensure this gets called once the resources are not used.
+ private def cleanup(): Unit = {
+ if (!closed) {
+ closed = true
+ batchIndex = batchOffsets.length // Prevent reading any other batch
+ fileStream = null
+ try {
+ val ds = deserializeStreamToBeClosed
+ deserializeStreamToBeClosed = null
+ deserializeStream = null
+ if (ds != null) {
+ ds.close()
+ }
+ } finally {
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+ }
+ }
+
+ override def finalize(): Unit = {
+ try {
+ cleanup()
+ } finally {
+ super.finalize()
+ }
}
}