aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-23 11:19:07 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-23 11:19:07 -0800
commitd942d3907241d50b693a316785af56023ec218b4 (patch)
tree942241dd66d8563b0967b090e19ac83d636b0fcf
parentc89824046a779713a19fc7b03a368f14f6ed1939 (diff)
downloadspark-d942d3907241d50b693a316785af56023ec218b4.tar.gz
spark-d942d3907241d50b693a316785af56023ec218b4.tar.bz2
spark-d942d3907241d50b693a316785af56023ec218b4.zip
Handle exceptions in RecordReader.close() better (suggested by Jim
Donahue)
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala17
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala15
2 files changed, 23 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 8139a2a40c..78097502bc 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,7 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
/**
@@ -42,7 +42,7 @@ class HadoopRDD[K, V](
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
- extends RDD[(K, V)](sc, Nil) {
+ extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -71,7 +71,7 @@ class HadoopRDD[K, V](
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback(() => reader.close())
+ context.addOnCompleteCallback{ () => close() }
val key: K = reader.createKey()
val value: V = reader.createValue()
@@ -88,9 +88,6 @@ class HadoopRDD[K, V](
}
gotNext = true
}
- if (finished) {
- reader.close()
- }
!finished
}
@@ -104,6 +101,14 @@ class HadoopRDD[K, V](
gotNext = false
(key, value)
}
+
+ private def close() {
+ try {
+ reader.close()
+ } catch {
+ case e: Exception => logWarning("Exception in RecordReader.close()", e)
+ }
+ }
}
override def getPreferredLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index ebd4c3f0e2..df2361025c 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
private[spark]
@@ -26,7 +26,8 @@ class NewHadoopRDD[K, V](
valueClass: Class[V],
@transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
- with HadoopMapReduceUtil {
+ with HadoopMapReduceUtil
+ with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -61,7 +62,7 @@ class NewHadoopRDD[K, V](
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback(() => reader.close())
+ context.addOnCompleteCallback(() => close())
var havePair = false
var finished = false
@@ -81,6 +82,14 @@ class NewHadoopRDD[K, V](
havePair = false
return (reader.getCurrentKey, reader.getCurrentValue)
}
+
+ private def close() {
+ try {
+ reader.close()
+ } catch {
+ case e: Exception => logWarning("Exception in RecordReader.close()", e)
+ }
+ }
}
override def getPreferredLocations(split: Partition): Seq[String] = {