aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala8
1 files changed, 2 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 296179b75b..085829af6e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1111,9 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
- } {
- writer.close(hadoopContext)
- }
+ }(finallyBlock = writer.close(hadoopContext))
committer.commitTask(hadoopContext)
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
om.setBytesWritten(callback())
@@ -1200,9 +1198,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
- } {
- writer.close()
- }
+ }(finallyBlock = writer.close())
writer.commit()
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
om.setBytesWritten(callback())