aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala11
1 files changed, 5 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 988c785dbe..468e101fed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
+import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
@@ -37,7 +37,6 @@ case class PartitionedFile(
}
}
-
/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
@@ -50,7 +49,7 @@ class FileScanRDD(
@transient val sqlContext: SQLContext,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
- extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
+ extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
@@ -65,17 +64,17 @@ class FileScanRDD(
if (files.hasNext) {
val nextFile = files.next()
logInfo(s"Reading File $nextFile")
- SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
+ InputFileNameHolder.setInputFileName(nextFile.filePath)
currentIterator = readFunction(nextFile)
hasNext
} else {
- SqlNewHadoopRDDState.unsetInputFileName()
+ InputFileNameHolder.unsetInputFileName()
false
}
}
override def close() = {
- SqlNewHadoopRDDState.unsetInputFileName()
+ InputFileNameHolder.unsetInputFileName()
}
}