From d942d3907241d50b693a316785af56023ec218b4 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 23 Feb 2013 11:19:07 -0800 Subject: Handle exceptions in RecordReader.close() better (suggested by Jim Donahue) --- core/src/main/scala/spark/rdd/HadoopRDD.scala | 17 +++++++++++------ core/src/main/scala/spark/rdd/NewHadoopRDD.scala | 15 ++++++++++++--- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 8139a2a40c..78097502bc 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -15,7 +15,7 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext} +import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} /** @@ -42,7 +42,7 @@ class HadoopRDD[K, V]( keyClass: Class[K], valueClass: Class[V], minSplits: Int) - extends RDD[(K, V)](sc, Nil) { + extends RDD[(K, V)](sc, Nil) with Logging { // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) @@ -71,7 +71,7 @@ class HadoopRDD[K, V]( reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback(() => reader.close()) + context.addOnCompleteCallback{ () => close() } val key: K = reader.createKey() val value: V = reader.createValue() @@ -88,9 +88,6 @@ class HadoopRDD[K, V]( } gotNext = true } - if (finished) { - reader.close() - } !finished } @@ -104,6 +101,14 @@ class HadoopRDD[K, V]( gotNext = false (key, value) } + + private def close() { + try { + reader.close() + } catch { + case e: Exception => logWarning("Exception in RecordReader.close()", e) + } + } } override def getPreferredLocations(split: Partition): Seq[String] = { diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index ebd4c3f0e2..df2361025c 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ -import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext} +import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} private[spark] @@ -26,7 +26,8 @@ class NewHadoopRDD[K, V]( valueClass: Class[V], @transient conf: Configuration) extends RDD[(K, V)](sc, Nil) - with HadoopMapReduceUtil { + with HadoopMapReduceUtil + with Logging { // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) @@ -61,7 +62,7 @@ class NewHadoopRDD[K, V]( reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback(() => reader.close()) + context.addOnCompleteCallback(() => close()) var havePair = false var finished = false @@ -81,6 +82,14 @@ class NewHadoopRDD[K, V]( havePair = false return (reader.getCurrentKey, reader.getCurrentValue) } + + private def close() { + try { + reader.close() + } catch { + case e: Exception => logWarning("Exception in RecordReader.close()", e) + } + } } override def getPreferredLocations(split: Partition): Seq[String] = { -- cgit v1.2.3