aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
1 files changed, 3 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 08db96edd6..ac5ba9e79f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,15 +213,13 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
- // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
-
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.inputSplit.value match {
- case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
- case _ => SqlNewHadoopRDDState.unsetInputFileName()
+ case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
+ case _ => InputFileNameHolder.unsetInputFileName()
}
// Find a function that will return the FileSystem bytes read by this thread. Do this before
@@ -271,7 +269,7 @@ class HadoopRDD[K, V](
override def close() {
if (reader != null) {
- SqlNewHadoopRDDState.unsetInputFileName()
+ InputFileNameHolder.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic