aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 64e9b436f0..8df8b4f83e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -168,6 +168,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten == serializerBatchSize) {
writer.commit()
+ writer.close()
+ _diskBytesSpilled += writer.bytesWritten
writer = getNewWriter
objectsWritten = 0
}
@@ -176,8 +178,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten > 0) writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
- _diskBytesSpilled += writer.bytesWritten
writer.close()
+ _diskBytesSpilled += writer.bytesWritten
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file, blockId))