From 1a175d13b967b0080cfa4b5d8d1c278e0e61565a Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Wed, 13 Mar 2013 10:17:39 -0500 Subject: Add NextIterator.closeIfNeeded. --- core/src/main/scala/spark/rdd/HadoopRDD.scala | 2 +- core/src/main/scala/spark/util/NextIterator.scala | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 43c6749ddc..a6322dc58d 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -72,7 +72,7 @@ class HadoopRDD[K, V]( reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback{ () => close() } + context.addOnCompleteCallback{ () => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue() diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala index da76b5f6d0..48b5018ddd 100644 --- a/core/src/main/scala/spark/util/NextIterator.scala +++ b/core/src/main/scala/spark/util/NextIterator.scala @@ -5,6 +5,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] { private var gotNext = false private var nextValue: U = _ + private var closed = false protected var finished = false /** @@ -34,12 +35,25 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] { */ protected def close() + /** + * Calls the subclass-defined close method, but only once. + * + * Usually calling `close` multiple times should be fine, but historically + * there have been issues with some InputFormats throwing exceptions. + */ + def closeIfNeeded() { + if (!closed) { + close() + closed = true + } + } + override def hasNext: Boolean = { if (!finished) { if (!gotNext) { nextValue = getNext() if (finished) { - close() + closeIfNeeded() } gotNext = true } -- cgit v1.2.3