diff options
author | windpiger <songjun@outlook.com> | 2017-03-02 23:54:01 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-03-02 23:54:01 -0800 |
commit | 982f3223b4f55f988091402063fe8746c5e2cee4 (patch) | |
tree | 3054ad65a839775ae1478e2cf1eadadd4373ee7e /sql/core/src/main/scala/org | |
parent | e24f21b5f8365ed25346e986748b393e0b4be25c (diff) | |
download | spark-982f3223b4f55f988091402063fe8746c5e2cee4.tar.gz spark-982f3223b4f55f988091402063fe8746c5e2cee4.tar.bz2 spark-982f3223b4f55f988091402063fe8746c5e2cee4.zip |
[SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice
## What changes were proposed in this pull request?
Currently when we resolveRelation for a `FileFormat DataSource` without providing user schema, it will execute `listFiles` twice in `InMemoryFileIndex` during `resolveRelation`.
This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice.
But there is a bug in `InMemoryFileIndex` see:
[SPARK-19748](https://github.com/apache/spark/pull/17079)
[SPARK-19761](https://github.com/apache/spark/pull/17093),
so this pr should be after SPARK-19748/ SPARK-19761.
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes #17081 from windpiger/resolveDataSourceScanFilesTwice.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index c1353d41e0..4947dfda6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -106,10 +106,13 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource + * @param fileStatusCache the shared cache for file statuses to speed up listing * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ - private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = { + private def getOrInferFileFormatSchema( + format: FileFormat, + fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = { // the operations below are expensive therefore try not to do them if we don't need to, e.g., // in streaming mode, we have already inferred and registered partition columns, we will // never have to materialize the lazy val below @@ -122,7 +125,7 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - new InMemoryFileIndex(sparkSession, globbedPaths, options, None) + new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) } val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning @@ -354,7 +357,8 @@ case class DataSource( globPath }.toArray - val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { @@ -364,7 +368,8 @@ case class DataSource( catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { - new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) + new InMemoryFileIndex( + sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache) } HadoopFsRelation( |