aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-18 12:45:37 -0700
committerYin Huai <yhuai@databricks.com>2015-05-18 12:45:37 -0700
commit9dadf019b93038e1e18336ccd06c5eecb4bae32f (patch)
tree6aa0efac2814082d9ea2a4d30d82c935b3bdf9f1 /sql
parent530397ba2f5c0fcabb86ba73048c95177ed0b9fc (diff)
downloadspark-9dadf019b93038e1e18336ccd06c5eecb4bae32f.tar.gz
spark-9dadf019b93038e1e18336ccd06c5eecb4bae32f.tar.bz2
spark-9dadf019b93038e1e18336ccd06c5eecb4bae32f.zip
[SPARK-7673] [SQL] WIP: HadoopFsRelation and ParquetRelation2 performance optimizations
This PR introduces several performance optimizations to `HadoopFsRelation` and `ParquetRelation2`: 1. Moving `FileStatus` listing from `DataSourceStrategy` into a cache within `HadoopFsRelation`. This new cache generalizes and replaces the one used in `ParquetRelation2`. This also introduces an interface change: to reuse cached `FileStatus` objects, `HadoopFsRelation.buildScan` methods now receive `Array[FileStatus]` instead of `Array[String]`. 1. When Parquet task side metadata reading is enabled, skip reading row group information when reading Parquet footers. This is basically what PR #5334 does. Also, now we uses `ParquetFileReader.readAllFootersInParallel` to read footers in parallel. Another optimization in question is, instead of asking `HadoopFsRelation.buildScan` to return an `RDD[Row]` for a single selected partition and then union them all, we ask it to return an `RDD[Row]` for all selected partitions. This optimization is based on the fact that Hadoop configuration broadcasting used in `NewHadoopRDD` takes 34% time in the following microbenchmark. However, this complicates data source user code because user code must merge partition values manually. To check the cost of broadcasting in `NewHadoopRDD`, I also did microbenchmark after removing the `broadcast` call in `NewHadoopRDD`. All results are shown below. ### Microbenchmark #### Preparation code Generating a partitioned table with 50k partitions, 1k rows per partition: ```scala import sqlContext._ import sqlContext.implicits._ for (n <- 0 until 500) { val data = for { p <- (n * 10) until ((n + 1) * 10) i <- 0 until 1000 } yield (i, f"val_$i%04d", f"$p%04d") data. toDF("a", "b", "p"). write. partitionBy("p"). mode("append"). parquet(path) } ``` #### Benchmarking code ```scala import sqlContext._ import sqlContext.implicits._ import org.apache.spark.sql.types._ import com.google.common.base.Stopwatch val path = "hdfs://localhost:9000/user/lian/5k" def benchmark(n: Int)(f: => Unit) { val stopwatch = new Stopwatch() def run() = { stopwatch.reset() stopwatch.start() f stopwatch.stop() stopwatch.elapsedMillis() } val records = (0 until n).map(_ => run()) (0 until n).foreach(i => println(s"Round $i: ${records(i)} ms")) println(s"Average: ${records.sum / n.toDouble} ms") } benchmark(3) { read.parquet(path).explain(extended = true) } ``` #### Results Before: ``` Round 0: 72528 ms Round 1: 68938 ms Round 2: 65372 ms Average: 68946.0 ms ``` After: ``` Round 0: 59499 ms Round 1: 53645 ms Round 2: 53844 ms Round 3: 49093 ms Round 4: 50555 ms Average: 53327.2 ms ``` Also removing Hadoop configuration broadcasting: (Note that I was testing on a local laptop, thus network cost is pretty low.) ``` Round 0: 15806 ms Round 1: 14394 ms Round 2: 14699 ms Round 3: 15334 ms Round 4: 14123 ms Average: 14871.2 ms ``` Author: Cheng Lian <lian@databricks.com> Closes #6225 from liancheng/spark-7673 and squashes the following commits: 2d58a2b [Cheng Lian] Skips reading row group information when using task side metadata reading 7aa3748 [Cheng Lian] Optimizes FileStatusCache by introducing a map from parent directories to child files ba41250 [Cheng Lian] Reuses HadoopFsRelation FileStatusCache in ParquetRelation2 3d278f7 [Cheng Lian] Fixes a bug when reading a single Parquet data file b84612a [Cheng Lian] Fixes Scala style issue 6a08b02 [Cheng Lian] WIP: Moves file status cache into HadoopFSRelation
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala104
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala6
4 files changed, 117 insertions, 91 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index fea54a2514..7ca44f7b81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -23,12 +23,11 @@ import scala.collection.JavaConversions._
import scala.util.Try
import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import parquet.filter2.predicate.FilterApi
-import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
@@ -175,8 +174,8 @@ private[sql] class ParquetRelation2(
override def dataSchema: StructType = metadataCache.dataSchema
override private[sql] def refresh(): Unit = {
- metadataCache.refresh()
super.refresh()
+ metadataCache.refresh()
}
// Parquet data source always uses Catalyst internal representations.
@@ -234,15 +233,15 @@ private[sql] class ParquetRelation2(
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[String]): RDD[Row] = {
+ inputFiles: Array[FileStatus]): RDD[Row] = {
val job = new Job(SparkHadoopUtil.get.conf)
val conf = ContextUtil.getConfiguration(job)
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
- if (inputPaths.nonEmpty) {
- FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
+ if (inputFiles.nonEmpty) {
+ FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}
// Try to push down filters when filter push-down is enabled.
@@ -269,10 +268,7 @@ private[sql] class ParquetRelation2(
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
- val inputFileStatuses =
- metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
-
- val footers = inputFileStatuses.map(metadataCache.footers)
+ val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
@@ -287,7 +283,7 @@ private[sql] class ParquetRelation2(
val cacheMetadata = useMetadataCache
- @transient val cachedStatuses = inputFileStatuses.map { f =>
+ @transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)
@@ -333,7 +329,7 @@ private[sql] class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _
// Parquet footer cache.
- var footers: Map[FileStatus, Footer] = _
+ var footers: Map[Path, Footer] = _
// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
@@ -349,35 +345,30 @@ private[sql] class ParquetRelation2(
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
- // Support either reading a collection of raw Parquet part-files, or a collection of folders
- // containing Parquet files (e.g. partitioned Parquet table).
- val baseStatuses = paths.distinct.flatMap { p =>
- val path = new Path(p)
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- Try(fs.getFileStatus(qualified)).toOption
- }
- assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
-
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
- val leaves = baseStatuses.flatMap { f =>
- val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
- SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
- isSummaryFile(f.getPath) ||
- !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
- }
- }
+ val leaves = cachedLeafStatuses().filter { f =>
+ isSummaryFile(f.getPath) ||
+ !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+ }.toArray
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
- footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
- val parquetMetadata = ParquetFileReader.readFooter(
- SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
- f -> new Footer(f.getPath, parquetMetadata)
- }.seq.toMap
+ footers = {
+ val conf = SparkHadoopUtil.get.conf
+ val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
+ val rawFooters = if (shouldMergeSchemas) {
+ ParquetFileReader.readAllFootersInParallel(
+ conf, seqAsJavaList(leaves), taskSideMetaData)
+ } else {
+ ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
+ conf, seqAsJavaList(leaves), taskSideMetaData)
+ }
+
+ rawFooters.map(footer => footer.getFile -> footer).toMap
+ }
// If we already get the schema, don't need to re-compute it since the schema merging is
// time-consuming.
@@ -448,7 +439,7 @@ private[sql] class ParquetRelation2(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
- ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
+ ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index e6324b20b3..1615a6dcbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,20 +17,16 @@
package org.apache.spark.sql.sources
-import org.apache.hadoop.fs.Path
-
import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{UnionRDD, RDD}
-import org.apache.spark.sql.Row
+import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.types.{StructType, UTF8String, StringType}
-import org.apache.spark.sql._
+import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
+import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
/**
* A Strategy for planning scans over data sources defined using the sources API.
@@ -58,7 +54,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters,
(a, _) => t.buildScan(a)) :: Nil
- // Scanning partitioned FSBasedRelation
+ // Scanning partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
@@ -86,22 +82,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
t.partitionSpec.partitionColumns,
selectedPartitions) :: Nil
- // Scanning non-partitioned FSBasedRelation
+ // Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
- val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
- val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
- val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.listLeafStatuses(fs, qualifiedPath).map(_.getPath).filterNot { path =>
- val name = path.getName
- name.startsWith("_") || name.startsWith(".")
- }.map(fs.makeQualified(_).toString)
- }
-
pruneFilterProject(
l,
projectList,
filters,
- (a, f) => t.buildScan(a, f, inputPaths)) :: Nil
+ (a, f) => t.buildScan(a, f, t.paths)) :: Nil
case l @ LogicalRelation(t: TableScan) =>
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
@@ -130,16 +117,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
- // Paths to all data files within this partition
- val dataFilePaths = {
- val dirPath = new Path(dir)
- val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf)
- fs.listStatus(dirPath).map(_.getPath).filterNot { path =>
- val name = path.getName
- name.startsWith("_") || name.startsWith(".")
- }.map(fs.makeQualified(_).toString)
- }
-
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
// Notice that the schema of data files, represented by `relation.dataSchema`, may contain
// some partition column(s).
@@ -155,7 +132,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
- val dataRows = relation.buildScan(nonPartitionColumns, filters, dataFilePaths)
+ val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))
// Merges data values with partition values.
mergeWithPartitionValues(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a82a6758d2..9b52d1be3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -17,14 +17,14 @@
package org.apache.spark.sql.sources
+import scala.collection.mutable
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
@@ -368,18 +368,61 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private var _partitionSpec: PartitionSpec = _
+ private class FileStatusCache {
+ var leafFiles = mutable.Map.empty[Path, FileStatus]
+
+ var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
+
+ var leafDirs = mutable.Map.empty[Path, FileStatus]
+
+ def refresh(): Unit = {
+ def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
+ files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+ }
+
+ leafDirs.clear()
+ leafFiles.clear()
+
+ // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
+ // may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
+ val statuses = paths.flatMap { path =>
+ val hdfsPath = new Path(path)
+ val fs = hdfsPath.getFileSystem(hadoopConf)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
+ }
+
+ val (dirs, files) = statuses.partition(_.isDir)
+ leafDirs ++= dirs.map(d => d.getPath -> d).toMap
+ leafFiles ++= files.map(f => f.getPath -> f).toMap
+ leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
+ }
+ }
+
+ private lazy val fileStatusCache = {
+ val cache = new FileStatusCache
+ cache.refresh()
+ cache
+ }
+
+ protected def cachedLeafStatuses(): Set[FileStatus] = {
+ fileStatusCache.leafFiles.values.toSet
+ }
+
final private[sql] def partitionSpec: PartitionSpec = {
if (_partitionSpec == null) {
_partitionSpec = maybePartitionSpec
.map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
.orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
.getOrElse {
- if (sqlContext.conf.partitionDiscoveryEnabled()) {
- discoverPartitions()
- } else {
- PartitionSpec(StructType(Nil), Array.empty[Partition])
+ if (sqlContext.conf.partitionDiscoveryEnabled()) {
+ discoverPartitions()
+ } else {
+ PartitionSpec(StructType(Nil), Array.empty[Partition])
+ }
}
- }
}
_partitionSpec
}
@@ -409,20 +452,14 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
def userDefinedPartitionColumns: Option[StructType] = None
private[sql] def refresh(): Unit = {
+ fileStatusCache.refresh()
if (sqlContext.conf.partitionDiscoveryEnabled()) {
_partitionSpec = discoverPartitions()
}
}
private def discoverPartitions(): PartitionSpec = {
- val basePaths = paths.map(new Path(_))
- val leafDirs = basePaths.flatMap { path =>
- val fs = path.getFileSystem(hadoopConf)
- Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory)))
- .filter(_.isDir)
- .map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
- .getOrElse(Seq.empty[FileStatus])
- }.map(_.getPath)
+ val leafDirs = fileStatusCache.leafDirs.keys.toSeq
if (leafDirs.nonEmpty) {
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
@@ -444,6 +481,27 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
})
}
+ private[sources] final def buildScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputPaths: Array[String]): RDD[Row] = {
+ val inputStatuses = inputPaths.flatMap { input =>
+ val path = new Path(input)
+
+ // First assumes `input` is a directory path, and tries to get all files contained in it.
+ fileStatusCache.leafDirToChildrenFiles.getOrElse(
+ path,
+ // Otherwise, `input` might be a file path
+ fileStatusCache.leafFiles.get(path).toArray
+ ).filter { status =>
+ val name = status.getPath.getName
+ !name.startsWith("_") && !name.startsWith(".")
+ }
+ }
+
+ buildScan(requiredColumns, filters, inputStatuses)
+ }
+
/**
* Specifies schema of actual data files. For partitioned relations, if one or more partitioned
* columns are contained in the data files, they should also appear in `dataSchema`.
@@ -457,13 +515,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
* this relation. For partitioned relations, this method is called for each selected partition,
* and builds an `RDD[Row]` containing all rows within that single partition.
*
- * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+ * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
* relation. For a partitioned relation, it contains paths of all data files in a single
* selected partition.
*
* @since 1.4.0
*/
- def buildScan(inputPaths: Array[String]): RDD[Row] = {
+ def buildScan(inputFiles: Array[FileStatus]): RDD[Row] = {
throw new UnsupportedOperationException(
"At least one buildScan() method should be overridden to read the relation.")
}
@@ -474,13 +532,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
* and builds an `RDD[Row]` containing all rows within that single partition.
*
* @param requiredColumns Required columns.
- * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+ * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
* relation. For a partitioned relation, it contains paths of all data files in a single
* selected partition.
*
* @since 1.4.0
*/
- def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = {
+ def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
// Yeah, to workaround serialization...
val dataSchema = this.dataSchema
val codegenEnabled = this.codegenEnabled
@@ -490,7 +548,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable)
}.toSeq
- buildScan(inputPaths).mapPartitions { rows =>
+ buildScan(inputFiles).mapPartitions { rows =>
val buildProjection = if (codegenEnabled) {
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
} else {
@@ -512,7 +570,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
* of all `filters`. The pushed down filters are currently purely an optimization as they
* will all be evaluated again. This means it is safe to use them with methods that produce
* false positives such as filtering partitions based on a bloom filter.
- * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+ * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
* relation. For a partitioned relation, it contains paths of all data files in a single
* selected partition.
*
@@ -521,8 +579,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[String]): RDD[Row] = {
- buildScan(requiredColumns, inputPaths)
+ inputFiles: Array[FileStatus]): RDD[Row] = {
+ buildScan(requiredColumns, inputFiles)
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 29b21586f9..09eed6646c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -21,7 +21,7 @@ import java.text.NumberFormat
import java.util.UUID
import com.google.common.base.Objects
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
@@ -101,10 +101,10 @@ class SimpleTextRelation(
override def hashCode(): Int =
Objects.hashCode(paths, maybeDataSchema, dataSchema)
- override def buildScan(inputPaths: Array[String]): RDD[Row] = {
+ override def buildScan(inputStatuses: Array[FileStatus]): RDD[Row] = {
val fields = dataSchema.map(_.dataType)
- sparkContext.textFile(inputPaths.mkString(",")).map { record =>
+ sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record =>
Row(record.split(",").zip(fields).map { case (value, dataType) =>
Cast(Literal(value), dataType).eval()
}: _*)