aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-03-03 13:03:52 -0800
committerAndrew Or <andrew@databricks.com>2015-03-03 13:03:52 -0800
commit9af001749a37a86ccbf78063ec514a21801645fa (patch)
tree079c23cc94a9280771b7a77db1ff1bbe420d393b
parente359794cec7d30ece38752f62dc2a1d3d26b8feb (diff)
downloadspark-9af001749a37a86ccbf78063ec514a21801645fa.tar.gz
spark-9af001749a37a86ccbf78063ec514a21801645fa.tar.bz2
spark-9af001749a37a86ccbf78063ec514a21801645fa.zip
Revert "[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file"
This reverts commit 90095bf3ce9304d09a32ceffaa99069079071b59.
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala52
1 files changed, 9 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index fc7e86e297..8a0f5a602d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -387,15 +387,6 @@ class ExternalAppendOnlyMap[K, V, C](
private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null
- @volatile private var closed = false
-
- // A volatile variable to remember which DeserializationStream is using. Need to set it when we
- // open a DeserializationStream. But we should use `deserializeStream` rather than
- // `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
- // reduce the performance. It must be volatile so that we can see its correct value in the
- // `finalize` method, which could run in any thread.
- @volatile private var deserializeStreamToBeClosed: DeserializationStream = null
-
// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
@@ -410,7 +401,6 @@ class ExternalAppendOnlyMap[K, V, C](
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
- deserializeStreamToBeClosed = null
deserializeStream.close()
fileStream.close()
deserializeStream = null
@@ -429,11 +419,7 @@ class ExternalAppendOnlyMap[K, V, C](
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
- // Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
- // close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
- // during reading the (K, C) pairs.
- deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
- deserializeStreamToBeClosed
+ ser.deserializeStream(compressedStream)
} else {
// No more batches left
cleanup()
@@ -482,34 +468,14 @@ class ExternalAppendOnlyMap[K, V, C](
item
}
- // TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
- // future, we need some mechanism to ensure this gets called once the resources are not used.
- private def cleanup(): Unit = {
- if (!closed) {
- closed = true
- batchIndex = batchOffsets.length // Prevent reading any other batch
- fileStream = null
- try {
- val ds = deserializeStreamToBeClosed
- deserializeStreamToBeClosed = null
- deserializeStream = null
- if (ds != null) {
- ds.close()
- }
- } finally {
- if (file.exists()) {
- file.delete()
- }
- }
- }
- }
-
- override def finalize(): Unit = {
- try {
- cleanup()
- } finally {
- super.finalize()
- }
+ // TODO: Ensure this gets called even if the iterator isn't drained.
+ private def cleanup() {
+ batchIndex = batchOffsets.length // Prevent reading any other batch
+ val ds = deserializeStream
+ deserializeStream = null
+ fileStream = null
+ ds.close()
+ file.delete()
}
}