aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-03-02 23:54:01 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-02 23:54:01 -0800
commit982f3223b4f55f988091402063fe8746c5e2cee4 (patch)
tree3054ad65a839775ae1478e2cf1eadadd4373ee7e
parente24f21b5f8365ed25346e986748b393e0b4be25c (diff)
downloadspark-982f3223b4f55f988091402063fe8746c5e2cee4.tar.gz
spark-982f3223b4f55f988091402063fe8746c5e2cee4.tar.bz2
spark-982f3223b4f55f988091402063fe8746c5e2cee4.zip
[SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice
## What changes were proposed in this pull request? Currently when we resolveRelation for a `FileFormat DataSource` without providing user schema, it will execute `listFiles` twice in `InMemoryFileIndex` during `resolveRelation`. This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice. But there is a bug in `InMemoryFileIndex` see: [SPARK-19748](https://github.com/apache/spark/pull/17079) [SPARK-19761](https://github.com/apache/spark/pull/17093), so this pr should be after SPARK-19748/ SPARK-19761. ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17081 from windpiger/resolveDataSourceScanFilesTwice.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala11
2 files changed, 20 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index c1353d41e0..4947dfda6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -106,10 +106,13 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
+ * @param fileStatusCache the shared cache for file statuses to speed up listing
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
- private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
+ private def getOrInferFileFormatSchema(
+ format: FileFormat,
+ fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
@@ -122,7 +125,7 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
- new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
+ new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
}
val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
@@ -354,7 +357,8 @@ case class DataSource(
globPath
}.toArray
- val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
+ val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+ val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
@@ -364,7 +368,8 @@ case class DataSource(
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
} else {
- new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
+ new InMemoryFileIndex(
+ sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)
}
HadoopFsRelation(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index b792a168a4..50506197b3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -411,4 +411,15 @@ class PartitionedTablePerfStatsSuite
}
}
}
+
+ test("resolveRelation for a FileFormat DataSource without userSchema scan filesystem only once") {
+ withTempDir { dir =>
+ import spark.implicits._
+ Seq(1).toDF("a").write.mode("overwrite").save(dir.getAbsolutePath)
+ HiveCatalogMetrics.reset()
+ spark.read.parquet(dir.getAbsolutePath)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
+ }
+ }
}