aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Allman <michael@videoamp.com>2016-10-14 18:26:18 -0700
committerReynold Xin <rxin@databricks.com>2016-10-14 18:26:18 -0700
commit6ce1b675ee9fc9a6034439c3ca00441f9f172f84 (patch)
tree5733f50b02eea423ce0f6c22114e356ffa321648 /sql/core
parent2d96d35dc0fed6df249606d9ce9272c0f0109fa2 (diff)
downloadspark-6ce1b675ee9fc9a6034439c3ca00441f9f172f84.tar.gz
spark-6ce1b675ee9fc9a6034439c3ca00441f9f172f84.tar.bz2
spark-6ce1b675ee9fc9a6034439c3ca00441f9f172f84.zip
[SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query
(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.) ## What changes were proposed in this pull request? In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference. If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild. In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space. This PR proposes an alternative approach. Basically, it makes four changes: 1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates. 1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates. 1. It removes partition loading and caching from `HiveMetastoreCatalog`. 1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog. The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters. As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted. ## Open Issues 1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR. 1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by #14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue. 1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`. 1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly. ## How was this patch tested? The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded. Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #14690 from mallman/spark-16980-lazy_partition_fetching.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala46
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala197
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala72
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala225
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala113
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala)16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala28
23 files changed, 561 insertions, 258 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7ae3275245..7dccbbd3f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
@@ -2614,7 +2614,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def inputFiles: Array[String] = {
- val files: Seq[String] = logicalPlan.collect {
+ val files: Seq[String] = queryExecution.optimizedPlan.collect {
case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 83b7c779ab..92fd366e10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -185,7 +185,7 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
- val invalidate = hr.location.paths
+ val invalidate = hr.location.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
.contains(qualifiedPath)
if (invalidate) hr.location.refresh()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 6cdba40693..623d2be55d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -225,13 +225,27 @@ case class FileSourceScanExec(
}
// These metadata values make scan plans uniquely identifiable for equality checking.
- override val metadata: Map[String, String] = Map(
- "Format" -> relation.fileFormat.toString,
- "ReadSchema" -> outputSchema.catalogString,
- "Batched" -> supportsBatch.toString,
- "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
- "PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
- "InputPaths" -> relation.location.paths.mkString(", "))
+ override val metadata: Map[String, String] = {
+ def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+ val location = relation.location
+ val locationDesc =
+ location.getClass.getSimpleName + seqToString(location.rootPaths)
+ val metadata =
+ Map(
+ "Format" -> relation.fileFormat.toString,
+ "ReadSchema" -> outputSchema.catalogString,
+ "Batched" -> supportsBatch.toString,
+ "PartitionFilters" -> seqToString(partitionFilters),
+ "PushedFilters" -> seqToString(dataFilters),
+ "Location" -> locationDesc)
+ val withOptPartitionCount =
+ relation.partitionSchemaOption.map { _ =>
+ metadata + ("PartitionCount" -> selectedPartitions.size.toString)
+ } getOrElse {
+ metadata
+ }
+ withOptPartitionCount
+ }
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 8b762b5d6c..981728331d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
import org.apache.spark.sql.internal.SQLConf
@@ -32,5 +33,6 @@ class SparkOptimizer(
override def batches: Seq[Batch] = super.batches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
+ Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index a04a13e698..a8c75a7f29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -67,7 +67,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
dataSource match {
case fs: HadoopFsRelation =>
- if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) {
+ if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
throw new AnalysisException(
"Cannot create a file-based external data source table without path")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e75e7d2770..92b1fff7d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -471,9 +471,7 @@ case class DataSource(
val existingPartitionColumns = Try {
resolveRelation()
.asInstanceOf[HadoopFsRelation]
- .location
- .partitionSpec()
- .partitionColumns
+ .partitionSchema
.fieldNames
.toSeq
}.getOrElse(Seq.empty[String])
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 6f9ed50a02..7d0abe86a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -163,14 +163,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if query.resolved && t.schema.asNullable == query.schema.asNullable =>
// Sanity checks
- if (t.location.paths.size != 1) {
+ if (t.location.rootPaths.size != 1) {
throw new AnalysisException(
"Can only write data to relations with a single path.")
}
- val outputPath = t.location.paths.head
+ val outputPath = t.location.rootPaths.head
val inputPaths = query.collect {
- case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.paths
+ case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
}.flatten
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
@@ -184,7 +184,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
t.bucketSpec,
t.fileFormat,
- () => t.refresh(),
+ () => t.location.refresh(),
t.options,
query,
mode)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index bde2d2b89d..e7239ef91b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
+
/**
* Used to read and write data stored in files to/from the [[InternalRow]] format.
*/
@@ -182,16 +183,17 @@ abstract class TextBasedFileFormat extends FileFormat {
case class Partition(values: InternalRow, files: Seq[FileStatus])
/**
- * An interface for objects capable of enumerating the files that comprise a relation as well
- * as the partitioning characteristics of those files.
+ * An interface for objects capable of enumerating the root paths of a relation as well as the
+ * partitions of a relation subject to some pruning expressions.
*/
-trait FileCatalog {
-
- /** Returns the list of input paths from which the catalog will get files. */
- def paths: Seq[Path]
+trait BasicFileCatalog {
- /** Returns the specification of the partitions inferred from the data. */
- def partitionSpec(): PartitionSpec
+ /**
+ * Returns the list of root input paths from which the catalog will get files. There may be a
+ * single root path from which partitions are discovered, or individual partitions may be
+ * specified by each path.
+ */
+ def rootPaths: Seq[Path]
/**
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
@@ -204,9 +206,33 @@ trait FileCatalog {
*/
def listFiles(filters: Seq[Expression]): Seq[Partition]
+ /** Returns the list of files that will be read when scanning this relation. */
+ def inputFiles: Array[String]
+
+ /** Refresh any cached file listings */
+ def refresh(): Unit
+
+ /** Sum of table file sizes, in bytes */
+ def sizeInBytes: Long
+}
+
+/**
+ * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from
+ * those, infer the relation's partition specification.
+ */
+// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for
+// which it is safe to list all of its files?
+trait FileCatalog extends BasicFileCatalog {
+
+ /** Returns the specification of the partitions inferred from the data. */
+ def partitionSpec(): PartitionSpec
+
/** Returns all the valid files. */
def allFiles(): Seq[FileStatus]
- /** Refresh the file listing */
- def refresh(): Unit
+ /** Returns the list of files that will be read when scanning this relation. */
+ override def inputFiles: Array[String] =
+ allFiles().map(_.getPath.toUri.toString).toArray
+
+ override def sizeInBytes: Long = allFiles().map(_.getLen).sum
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index c7ebe0b76a..db889edf03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
- * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
- * this relation.
+ * @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that
+ * comprise this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
* present in the actual data files as well, they are preserved.
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
- location: FileCatalog,
+ location: BasicFileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
@@ -58,10 +58,6 @@ case class HadoopFsRelation(
def partitionSchemaOption: Option[StructType] =
if (partitionSchema.isEmpty) None else Some(partitionSchema)
- def partitionSpec: PartitionSpec = location.partitionSpec()
-
- def refresh(): Unit = location.refresh()
-
override def toString: String = {
fileFormat match {
case source: DataSourceRegister => source.shortName()
@@ -69,9 +65,7 @@ case class HadoopFsRelation(
}
}
- /** Returns the list of files that will be read when scanning this relation. */
- override def inputFiles: Array[String] =
- location.allFiles().map(_.getPath.toUri.toString).toArray
+ override def sizeInBytes: Long = location.sizeInBytes
- override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
+ override def inputFiles: Array[String] = location.inputFiles
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index a68ae523e0..6d10501b72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,32 +17,26 @@
package org.apache.spark.sql.execution.datasources
-import java.io.FileNotFoundException
-
import scala.collection.mutable
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
/**
* A [[FileCatalog]] that generates the list of files to process by recursively listing all the
* files present in `paths`.
*
+ * @param rootPaths the list of root table paths to scan
* @param parameters as set of options to control discovery
- * @param paths a list of paths to scan
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
*/
class ListingFileCatalog(
sparkSession: SparkSession,
- override val paths: Seq[Path],
+ override val rootPaths: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType])
extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
@@ -70,198 +64,17 @@ class ListingFileCatalog(
}
override def refresh(): Unit = {
- val files = listLeafFiles(paths)
+ val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedPartitionSpec = null
}
- /**
- * List leaf files of given paths. This method will submit a Spark job to do parallel
- * listing whenever there is a path having more files than the parallel partition discovery
- * discovery threshold.
- *
- * This is publicly visible for testing.
- */
- def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
- val files =
- if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
- } else {
- ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
- }
-
- mutable.LinkedHashSet(files: _*)
- }
-
override def equals(other: Any): Boolean = other match {
- case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
+ case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet
case _ => false
}
- override def hashCode(): Int = paths.toSet.hashCode()
-}
-
-
-object ListingFileCatalog extends Logging {
-
- /** A serializable variant of HDFS's BlockLocation. */
- private case class SerializableBlockLocation(
- names: Array[String],
- hosts: Array[String],
- offset: Long,
- length: Long)
-
- /** A serializable variant of HDFS's FileStatus. */
- private case class SerializableFileStatus(
- path: String,
- length: Long,
- isDir: Boolean,
- blockReplication: Short,
- blockSize: Long,
- modificationTime: Long,
- accessTime: Long,
- blockLocations: Array[SerializableBlockLocation])
-
- /**
- * List a collection of path recursively.
- */
- private def listLeafFilesInSerial(
- paths: Seq[Path],
- hadoopConf: Configuration): Seq[FileStatus] = {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass)
- val filter = FileInputFormat.getInputPathFilter(jobConf)
-
- paths.flatMap { path =>
- val fs = path.getFileSystem(hadoopConf)
- listLeafFiles0(fs, path, filter)
- }
- }
-
- /**
- * List a collection of path recursively in parallel (using Spark executors).
- * Each task launched will use [[listLeafFilesInSerial]] to list.
- */
- private def listLeafFilesInParallel(
- paths: Seq[Path],
- hadoopConf: Configuration,
- sparkSession: SparkSession): Seq[FileStatus] = {
- assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
- logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
-
- val sparkContext = sparkSession.sparkContext
- val serializableConfiguration = new SerializableConfiguration(hadoopConf)
- val serializedPaths = paths.map(_.toString)
-
- // Set the number of parallelism to prevent following file listing from generating many tasks
- // in case of large #defaultParallelism.
- val numParallelism = Math.min(paths.size, 10000)
-
- val statuses = sparkContext
- .parallelize(serializedPaths, numParallelism)
- .mapPartitions { paths =>
- val hadoopConf = serializableConfiguration.value
- listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
- }.map { status =>
- // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
- val blockLocations = status match {
- case f: LocatedFileStatus =>
- f.getBlockLocations.map { loc =>
- SerializableBlockLocation(
- loc.getNames,
- loc.getHosts,
- loc.getOffset,
- loc.getLength)
- }
-
- case _ =>
- Array.empty[SerializableBlockLocation]
- }
-
- SerializableFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime,
- blockLocations)
- }.collect()
-
- // Turn SerializableFileStatus back to Status
- statuses.map { f =>
- val blockLocations = f.blockLocations.map { loc =>
- new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
- }
- new LocatedFileStatus(
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
- blockLocations)
- }
- }
-
- /**
- * List a single path, provided as a FileStatus, in serial.
- */
- private def listLeafFiles0(
- fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
- logTrace(s"Listing $path")
- val name = path.getName.toLowerCase
- if (shouldFilterOut(name)) {
- Seq.empty[FileStatus]
- } else {
- // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
- // Note that statuses only include FileStatus for the files and dirs directly under path,
- // and does not include anything else recursively.
- val statuses = try fs.listStatus(path) catch {
- case _: FileNotFoundException =>
- logWarning(s"The directory $path was not found. Was it deleted very recently?")
- Array.empty[FileStatus]
- }
-
- val allLeafStatuses = {
- val (dirs, files) = statuses.partition(_.isDirectory)
- val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
- if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
- }
-
- allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
- case f: LocatedFileStatus =>
- f
-
- // NOTE:
- //
- // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
- // operations, calling `getFileBlockLocations` does no harm here since these file system
- // implementations don't actually issue RPC for this method.
- //
- // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
- // be a big deal since we always use to `listLeafFilesInParallel` when the number of
- // paths exceeds threshold.
- case f =>
- // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
- // which is very slow on some file system (RawLocalFileSystem, which is launch a
- // subprocess and parse the stdout).
- val locations = fs.getFileBlockLocations(f, 0, f.getLen)
- val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
- f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
- if (f.isSymlink) {
- lfs.setSymlink(f.getSymlink)
- }
- lfs
- }
- }
- }
-
- /** Checks if we should filter out this path name. */
- def shouldFilterOut(pathName: String): Boolean = {
- // We filter everything that starts with _ and ., except _common_metadata and _metadata
- // because Parquet needs to find those metadata files from leaf files returned by this method.
- // We should refactor this logic to not mix metadata files with data files.
- ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
- !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
- }
+ override def hashCode(): Int = rootPaths.toSet.hashCode()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index d9562fd32e..7c28d48f26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -94,7 +94,7 @@ case class LogicalRelation(
}
override def refresh(): Unit = relation match {
- case fs: HadoopFsRelation => fs.refresh()
+ case fs: HadoopFsRelation => fs.location.refresh()
case _ => // Do nothing.
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 702ba97222..b2508115c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
@@ -40,9 +39,10 @@ abstract class PartitioningAwareFileCatalog(
sparkSession: SparkSession,
parameters: Map[String, String],
partitionSchema: Option[StructType])
- extends FileCatalog with Logging {
+ extends SessionFileCatalog(sparkSession) with FileCatalog {
+ import PartitioningAwareFileCatalog.BASE_PATH_PARAM
- protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+ override protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
@@ -72,8 +72,8 @@ abstract class PartitioningAwareFileCatalog(
override def allFiles(): Seq[FileStatus] = {
if (partitionSpec().partitionColumns.isEmpty) {
- // For each of the input paths, get the list of files inside them
- paths.flatMap { path =>
+ // For each of the root input paths, get the list of files inside them
+ rootPaths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = path.getFileSystem(hadoopConf)
val qualifiedPathPre = fs.makeQualified(path)
@@ -105,8 +105,6 @@ abstract class PartitioningAwareFileCatalog(
protected def inferPartitioning(): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
- // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
- // counted as data files, so that they shouldn't participate partition discovery.
files.exists(f => isDataPath(f.getPath))
}.keys.toSeq
partitionSchema match {
@@ -194,24 +192,30 @@ abstract class PartitioningAwareFileCatalog(
* and the returned DataFrame will have the column of `something`.
*/
private def basePaths: Set[Path] = {
- parameters.get("basePath").map(new Path(_)) match {
+ parameters.get(BASE_PATH_PARAM).map(new Path(_)) match {
case Some(userDefinedBasePath) =>
val fs = userDefinedBasePath.getFileSystem(hadoopConf)
if (!fs.isDirectory(userDefinedBasePath)) {
- throw new IllegalArgumentException("Option 'basePath' must be a directory")
+ throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory")
}
Set(fs.makeQualified(userDefinedBasePath))
case None =>
- paths.map { path =>
+ rootPaths.map { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
}
}
+ // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
+ // counted as data files, so that they shouldn't participate partition discovery.
private def isDataPath(path: Path): Boolean = {
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
}
+
+object PartitioningAwareFileCatalog {
+ val BASE_PATH_PARAM = "basePath"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
new file mode 100644
index 0000000000..29121a47d9
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+ case op @ PhysicalOperation(projects, filters,
+ logicalRelation @
+ LogicalRelation(fsRelation @
+ HadoopFsRelation(
+ tableFileCatalog: TableFileCatalog,
+ partitionSchema,
+ _,
+ _,
+ _,
+ _),
+ _,
+ _))
+ if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
+ // The attribute name of predicate could be different than the one in schema in case of
+ // case insensitive, we should change them to match the one in schema, so we donot need to
+ // worry about case sensitivity anymore.
+ val normalizedFilters = filters.map { e =>
+ e transform {
+ case a: AttributeReference =>
+ a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
+ }
+ }
+
+ val sparkSession = fsRelation.sparkSession
+ val partitionColumns =
+ logicalRelation.resolve(
+ partitionSchema, sparkSession.sessionState.analyzer.resolver)
+ val partitionSet = AttributeSet(partitionColumns)
+ val partitionKeyFilters =
+ ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+
+ if (partitionKeyFilters.nonEmpty) {
+ val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+ val prunedFsRelation =
+ fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+ val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
+
+ // Keep partition-pruning predicates so that they are visible in physical planning
+ val filterExpression = filters.reduceLeft(And)
+ val filter = Filter(filterExpression, prunedLogicalRelation)
+ Project(projects, filter)
+ } else {
+ op
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
new file mode 100644
index 0000000000..4807a92c2e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.SerializableConfiguration
+
+
+/**
+ * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the ability to find leaf
+ * files in a list of HDFS paths.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param ignoreFileNotFound (see [[ListingFileCatalog]])
+ */
+abstract class SessionFileCatalog(sparkSession: SparkSession)
+ extends BasicFileCatalog with Logging {
+ protected val hadoopConf: Configuration
+
+ /**
+ * List leaf files of given paths. This method will submit a Spark job to do parallel
+ * listing whenever there is a path having more files than the parallel partition discovery
+ * discovery threshold.
+ *
+ * This is publicly visible for testing.
+ */
+ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ val files =
+ if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ SessionFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
+ } else {
+ SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+ }
+
+ HiveCatalogMetrics.incrementFilesDiscovered(files.size)
+ mutable.LinkedHashSet(files: _*)
+ }
+}
+
+object SessionFileCatalog extends Logging {
+
+ /** A serializable variant of HDFS's BlockLocation. */
+ private case class SerializableBlockLocation(
+ names: Array[String],
+ hosts: Array[String],
+ offset: Long,
+ length: Long)
+
+ /** A serializable variant of HDFS's FileStatus. */
+ private case class SerializableFileStatus(
+ path: String,
+ length: Long,
+ isDir: Boolean,
+ blockReplication: Short,
+ blockSize: Long,
+ modificationTime: Long,
+ accessTime: Long,
+ blockLocations: Array[SerializableBlockLocation])
+
+ /**
+ * List a collection of path recursively.
+ */
+ private def listLeafFilesInSerial(
+ paths: Seq[Path],
+ hadoopConf: Configuration): Seq[FileStatus] = {
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(hadoopConf, this.getClass)
+ val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+ paths.flatMap { path =>
+ val fs = path.getFileSystem(hadoopConf)
+ listLeafFiles0(fs, path, filter)
+ }
+ }
+
+ /**
+ * List a collection of path recursively in parallel (using Spark executors).
+ * Each task launched will use [[listLeafFilesInSerial]] to list.
+ */
+ private def listLeafFilesInParallel(
+ paths: Seq[Path],
+ hadoopConf: Configuration,
+ sparkSession: SparkSession): Seq[FileStatus] = {
+ assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+ logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+ val sparkContext = sparkSession.sparkContext
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+ val serializedPaths = paths.map(_.toString)
+
+ // Set the number of parallelism to prevent following file listing from generating many tasks
+ // in case of large #defaultParallelism.
+ val numParallelism = Math.min(paths.size, 10000)
+
+ val statuses = sparkContext
+ .parallelize(serializedPaths, numParallelism)
+ .mapPartitions { paths =>
+ val hadoopConf = serializableConfiguration.value
+ listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+ }.map { status =>
+ // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+ val blockLocations = status match {
+ case f: LocatedFileStatus =>
+ f.getBlockLocations.map { loc =>
+ SerializableBlockLocation(
+ loc.getNames,
+ loc.getHosts,
+ loc.getOffset,
+ loc.getLength)
+ }
+
+ case _ =>
+ Array.empty[SerializableBlockLocation]
+ }
+
+ SerializableFileStatus(
+ status.getPath.toString,
+ status.getLen,
+ status.isDirectory,
+ status.getReplication,
+ status.getBlockSize,
+ status.getModificationTime,
+ status.getAccessTime,
+ blockLocations)
+ }.collect()
+
+ // Turn SerializableFileStatus back to Status
+ statuses.map { f =>
+ val blockLocations = f.blockLocations.map { loc =>
+ new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+ }
+ new LocatedFileStatus(
+ new FileStatus(
+ f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
+ blockLocations)
+ }
+ }
+
+ /**
+ * List a single path, provided as a FileStatus, in serial.
+ */
+ private def listLeafFiles0(
+ fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+ logTrace(s"Listing $path")
+ val name = path.getName.toLowerCase
+ if (shouldFilterOut(name)) {
+ Seq.empty[FileStatus]
+ } else {
+ // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
+ // Note that statuses only include FileStatus for the files and dirs directly under path,
+ // and does not include anything else recursively.
+ val statuses = try fs.listStatus(path) catch {
+ case _: FileNotFoundException =>
+ logWarning(s"The directory $path was not found. Was it deleted very recently?")
+ Array.empty[FileStatus]
+ }
+
+ val allLeafStatuses = {
+ val (dirs, files) = statuses.partition(_.isDirectory)
+ val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
+ if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+ }
+
+ allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+ case f: LocatedFileStatus =>
+ f
+
+ // NOTE:
+ //
+ // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+ // operations, calling `getFileBlockLocations` does no harm here since these file system
+ // implementations don't actually issue RPC for this method.
+ //
+ // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+ // be a big deal since we always use to `listLeafFilesInParallel` when the number of
+ // paths exceeds threshold.
+ case f =>
+ // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
+ // which is very slow on some file system (RawLocalFileSystem, which is launch a
+ // subprocess and parse the stdout).
+ val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+ val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+ f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+ if (f.isSymlink) {
+ lfs.setSymlink(f.getSymlink)
+ }
+ lfs
+ }
+ }
+ }
+
+ /** Checks if we should filter out this path name. */
+ def shouldFilterOut(pathName: String): Boolean = {
+ // We filter everything that starts with _ and ., except _common_metadata and _metadata
+ // because Parquet needs to find those metadata files from leaf files returned by this method.
+ // We should refactor this logic to not mix metadata files with data files.
+ ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
+ !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
new file mode 100644
index 0000000000..a5c41b2445
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+ sparkSession: SparkSession,
+ db: String,
+ table: String,
+ partitionSchema: Option[StructType],
+ override val sizeInBytes: Long)
+ extends SessionFileCatalog(sparkSession) {
+
+ override protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+
+ private val externalCatalog = sparkSession.sharedState.externalCatalog
+
+ private val catalogTable = externalCatalog.getTable(db, table)
+
+ private val baseLocation = catalogTable.storage.locationUri
+
+ override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+ override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
+ filterPartitions(filters).listFiles(Nil)
+ }
+
+ override def refresh(): Unit = {}
+
+ /**
+ * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
+ * specified by the given partition-pruning filters.
+ *
+ * @param filters partition-pruning filters
+ */
+ def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
+ if (filters.isEmpty) {
+ cachedAllPartitions
+ } else {
+ filterPartitions0(filters)
+ }
+ }
+
+ private def filterPartitions0(filters: Seq[Expression]): ListingFileCatalog = {
+ val parameters = baseLocation
+ .map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc))
+ .getOrElse(Map.empty)
+ partitionSchema match {
+ case Some(schema) =>
+ val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters)
+ val partitions = selectedPartitions.map { p =>
+ PartitionDirectory(p.toRow(schema), p.storage.locationUri.get)
+ }
+ val partitionSpec = PartitionSpec(schema, partitions)
+ new PrunedTableFileCatalog(
+ sparkSession, new Path(baseLocation.get), partitionSpec)
+ case None =>
+ new ListingFileCatalog(sparkSession, rootPaths, parameters, None)
+ }
+ }
+
+ // Not used in the hot path of queries when metastore partition pruning is enabled
+ lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
+
+ override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
+}
+
+/**
+ * An override of the standard HDFS listing based catalog, that overrides the partition spec with
+ * the information from the metastore.
+ *
+ * @param tableBasePath The default base path of the Hive metastore table
+ * @param partitionSpec The partition specifications from Hive metastore
+ */
+private class PrunedTableFileCatalog(
+ sparkSession: SparkSession,
+ tableBasePath: Path,
+ override val partitionSpec: PartitionSpec)
+ extends ListingFileCatalog(
+ sparkSession,
+ partitionSpec.partitions.map(_.path),
+ Map.empty,
+ Some(partitionSpec.partitionColumns))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index f1a35dd8a6..4dea8cf29e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -269,11 +269,15 @@ private[parquet] object ParquetReadSupport {
*/
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
- val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
+ val parquetFieldMap = parquetRecord.getFields.asScala
+ .map(f => f.getName -> f).toMap
+ val caseInsensitiveParquetFieldMap = parquetRecord.getFields.asScala
+ .map(f => f.getName.toLowerCase -> f).toMap
val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
+ .orElse(caseInsensitiveParquetFieldMap.get(f.name.toLowerCase))
.map(clipParquetType(_, f.dataType))
.getOrElse(toParquet.convertField(f))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
index a32c4671e3..82b67cb1ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -47,7 +47,7 @@ class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
allFilesFromLog.toArray.groupBy(_.getPath.getParent)
}
- override def paths: Seq[Path] = path :: Nil
+ override def rootPaths: Seq[Path] = path :: Nil
override def refresh(): Unit = { }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c8447651dd..e73d0187b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -269,6 +269,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val HIVE_FILESOURCE_PARTITION_PRUNING =
+ SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
+ .doc("When true, enable metastore partition pruning for file source tables as well. " +
+ "This is currently implemented for converted Hive tables only.")
+ .booleanConf
+ .createWithDefault(true)
+
val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -676,6 +683,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
+ def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING)
+
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index fa3abd0098..2695974b84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -77,13 +77,14 @@ class FileCatalogSuite extends SharedSQLContext {
val catalog1 = new ListingFileCatalog(
spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
// doesn't throw an exception
- assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
+ assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty)
}
}
test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
class MockCatalog(
- override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
+ override val rootPaths: Seq[Path])
+ extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
override def refresh(): Unit = {}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c5deb31fec..c32254d9df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -395,7 +395,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
val fileCatalog = new ListingFileCatalog(
sparkSession = spark,
- paths = Seq(new Path(tempDir)),
+ rootPaths = Seq(new Path(tempDir)),
parameters = Map.empty[String, String],
partitionSchema = None)
// This should not fail.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
index f15730aeb1..df50958337 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
@@ -19,16 +19,16 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.SparkFunSuite
-class ListingFileCatalogSuite extends SparkFunSuite {
+class SessionFileCatalogSuite extends SparkFunSuite {
test("file filtering") {
- assert(!ListingFileCatalog.shouldFilterOut("abcd"))
- assert(ListingFileCatalog.shouldFilterOut(".ab"))
- assert(ListingFileCatalog.shouldFilterOut("_cd"))
+ assert(!SessionFileCatalog.shouldFilterOut("abcd"))
+ assert(SessionFileCatalog.shouldFilterOut(".ab"))
+ assert(SessionFileCatalog.shouldFilterOut("_cd"))
- assert(!ListingFileCatalog.shouldFilterOut("_metadata"))
- assert(!ListingFileCatalog.shouldFilterOut("_common_metadata"))
- assert(ListingFileCatalog.shouldFilterOut("_ab_metadata"))
- assert(ListingFileCatalog.shouldFilterOut("_cd_common_metadata"))
+ assert(!SessionFileCatalog.shouldFilterOut("_metadata"))
+ assert(!SessionFileCatalog.shouldFilterOut("_common_metadata"))
+ assert(SessionFileCatalog.shouldFilterOut("_ab_metadata"))
+ assert(SessionFileCatalog.shouldFilterOut("_cd_common_metadata"))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 8d18be9300..43357c97c3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -30,7 +30,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -626,8 +626,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: HadoopFsRelation, _, _) =>
- assert(relation.partitionSpec === PartitionSpec.emptySpec)
+ case LogicalRelation(HadoopFsRelation(location: FileCatalog, _, _, _, _, _), _, _) =>
+ assert(location.partitionSpec === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7eb5..c3d202ced2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1081,6 +1081,34 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
testSchemaClipping(
+ "falls back to case insensitive resolution",
+
+ parquetSchema =
+ """message root {
+ | required group A {
+ | optional int32 B;
+ | }
+ | optional int32 c;
+ |}
+ """.stripMargin,
+
+ catalystSchema = {
+ val nestedType = new StructType().add("b", IntegerType, nullable = true)
+ new StructType()
+ .add("a", nestedType, nullable = true)
+ .add("c", IntegerType, nullable = true)
+ },
+
+ expectedSchema =
+ """message root {
+ | required group A {
+ | optional int32 B;
+ | }
+ | optional int32 c;
+ |}
+ """.stripMargin)
+
+ testSchemaClipping(
"simple nested struct",
parquetSchema =