diff options
author | zsxwing <zsxwing@gmail.com> | 2014-04-18 17:49:22 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-04-18 17:49:22 -0700 |
commit | 2089e0e7e7c73656daee7b56f8100332f4d2175c (patch) | |
tree | a816822cb1ca1c26a786bc6623fed762e16bb8fa /core | |
parent | c399baa0fc40be7aa51835aaeadcd5d768dfdb95 (diff) | |
download | spark-2089e0e7e7c73656daee7b56f8100332f4d2175c.tar.gz spark-2089e0e7e7c73656daee7b56f8100332f4d2175c.tar.bz2 spark-2089e0e7e7c73656daee7b56f8100332f4d2175c.zip |
SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and save...
...AsNewAPIHadoopDataset
`writer.close` should be put in the `finally` block to avoid potential resource leaks.
JIRA: https://issues.apache.org/jira/browse/SPARK-1482
Author: zsxwing <zsxwing@gmail.com>
Closes #400 from zsxwing/SPARK-1482 and squashes the following commits:
06b197a [zsxwing] SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and saveAsNewAPIHadoopDataset
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 343e4325c0..d250bef6aa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -693,11 +693,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] - while (iter.hasNext) { - val (k, v) = iter.next() - writer.write(k, v) + try { + while (iter.hasNext) { + val (k, v) = iter.next() + writer.write(k, v) + } + } + finally { + writer.close(hadoopContext) } - writer.close(hadoopContext) committer.commitTask(hadoopContext) return 1 } @@ -750,15 +754,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() - - var count = 0 - while(iter.hasNext) { - val record = iter.next() - count += 1 - writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + try { + var count = 0 + while(iter.hasNext) { + val record = iter.next() + count += 1 + writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + } + } + finally { + writer.close() } - - writer.close() writer.commit() } |