aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-03-13 10:17:39 -0500
committerStephen Haberman <stephen@exigencecorp.com>2013-03-13 10:17:39 -0500
commit1a175d13b967b0080cfa4b5d8d1c278e0e61565a (patch)
tree5407609608be0dbbca8a86d49700949666e250cb
parent8f00d23598bc3d96f1e270fd0c652b1602efb18e (diff)
downloadspark-1a175d13b967b0080cfa4b5d8d1c278e0e61565a.tar.gz
spark-1a175d13b967b0080cfa4b5d8d1c278e0e61565a.tar.bz2
spark-1a175d13b967b0080cfa4b5d8d1c278e0e61565a.zip
Add NextIterator.closeIfNeeded.
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/util/NextIterator.scala16
2 files changed, 16 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 43c6749ddc..a6322dc58d 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -72,7 +72,7 @@ class HadoopRDD[K, V](
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback{ () => close() }
+ context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala
index da76b5f6d0..48b5018ddd 100644
--- a/core/src/main/scala/spark/util/NextIterator.scala
+++ b/core/src/main/scala/spark/util/NextIterator.scala
@@ -5,6 +5,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
private var gotNext = false
private var nextValue: U = _
+ private var closed = false
protected var finished = false
/**
@@ -34,12 +35,25 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
*/
protected def close()
+ /**
+ * Calls the subclass-defined close method, but only once.
+ *
+ * Usually calling `close` multiple times should be fine, but historically
+ * there have been issues with some InputFormats throwing exceptions.
+ */
+ def closeIfNeeded() {
+ if (!closed) {
+ close()
+ closed = true
+ }
+ }
+
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
- close()
+ closeIfNeeded()
}
gotNext = true
}