aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-04-18 17:49:22 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-18 17:49:22 -0700
commit2089e0e7e7c73656daee7b56f8100332f4d2175c (patch)
treea816822cb1ca1c26a786bc6623fed762e16bb8fa /core
parentc399baa0fc40be7aa51835aaeadcd5d768dfdb95 (diff)
downloadspark-2089e0e7e7c73656daee7b56f8100332f4d2175c.tar.gz
spark-2089e0e7e7c73656daee7b56f8100332f4d2175c.tar.bz2
spark-2089e0e7e7c73656daee7b56f8100332f4d2175c.zip
SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and save...
...AsNewAPIHadoopDataset `writer.close` should be put in the `finally` block to avoid potential resource leaks. JIRA: https://issues.apache.org/jira/browse/SPARK-1482 Author: zsxwing <zsxwing@gmail.com> Closes #400 from zsxwing/SPARK-1482 and squashes the following commits: 06b197a [zsxwing] SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and saveAsNewAPIHadoopDataset
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala30
1 files changed, 18 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 343e4325c0..d250bef6aa 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -693,11 +693,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
- while (iter.hasNext) {
- val (k, v) = iter.next()
- writer.write(k, v)
+ try {
+ while (iter.hasNext) {
+ val (k, v) = iter.next()
+ writer.write(k, v)
+ }
+ }
+ finally {
+ writer.close(hadoopContext)
}
- writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}
@@ -750,15 +754,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
-
- var count = 0
- while(iter.hasNext) {
- val record = iter.next()
- count += 1
- writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+ try {
+ var count = 0
+ while(iter.hasNext) {
+ val record = iter.next()
+ count += 1
+ writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+ }
+ }
+ finally {
+ writer.close()
}
-
- writer.close()
writer.commit()
}