From 8537c00e0a17eff2a8c6745fbdd1d08873c0434d Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 16 Mar 2017 18:31:57 -0700 Subject: [SPARK-19987][SQL] Pass all filters into FileIndex ## What changes were proposed in this pull request? This is a tiny teeny refactoring to pass data filters also to the FileIndex, so FileIndex can have a more global view on predicates. ## How was this patch tested? Change should be covered by existing test cases. Author: Reynold Xin Closes #17322 from rxin/SPARK-19987. --- .../spark/sql/execution/DataSourceScanExec.scala | 23 +++++++++++++--------- .../sql/execution/OptimizeMetadataOnlyQuery.scala | 2 +- .../execution/datasources/CatalogFileIndex.scala | 5 +++-- .../sql/execution/datasources/FileIndex.scala | 15 +++++++++----- .../execution/datasources/FileSourceStrategy.scala | 5 +---- .../datasources/PartitioningAwareFileIndex.scala | 8 +++++--- 6 files changed, 34 insertions(+), 24 deletions(-) (limited to 'sql/core/src/main/scala/org/apache') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 8ebad676ca..bfe9c8e351 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -23,18 +23,18 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils trait DataSourceScanExec extends LeafExecNode with CodegenSupport { @@ -135,7 +135,7 @@ case class RowDataSourceScanExec( * @param output Output attributes of the scan. * @param outputSchema Output schema of the scan. * @param partitionFilters Predicates to use for partition pruning. - * @param dataFilters Data source filters to use for filtering data within partitions. + * @param dataFilters Filters on non-partition columns. * @param metastoreTableIdentifier identifier for the table in the metastore. */ case class FileSourceScanExec( @@ -143,7 +143,7 @@ case class FileSourceScanExec( output: Seq[Attribute], outputSchema: StructType, partitionFilters: Seq[Expression], - dataFilters: Seq[Filter], + dataFilters: Seq[Expression], override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { @@ -156,7 +156,8 @@ case class FileSourceScanExec( false } - @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters) + @transient private lazy val selectedPartitions = + relation.location.listFiles(partitionFilters, dataFilters) override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { @@ -225,6 +226,10 @@ case class FileSourceScanExec( } } + @transient + private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) + logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") + // These metadata values make scan plans uniquely identifiable for equality checking. override val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") @@ -237,7 +242,7 @@ case class FileSourceScanExec( "ReadSchema" -> outputSchema.catalogString, "Batched" -> supportsBatch.toString, "PartitionFilters" -> seqToString(partitionFilters), - "PushedFilters" -> seqToString(dataFilters), + "PushedFilters" -> seqToString(pushedDownFilters), "Location" -> locationDesc) val withOptPartitionCount = relation.partitionSchemaOption.map { _ => @@ -255,7 +260,7 @@ case class FileSourceScanExec( dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = outputSchema, - filters = dataFilters, + filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 769deb1890..3c046ce494 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -98,7 +98,7 @@ case class OptimizeMetadataOnlyQuery( relation match { case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) - val partitionData = fsRelation.location.listFiles(filters = Nil) + val partitionData = fsRelation.location.listFiles(Nil, Nil) LocalRelation(partAttrs, partitionData.map(_.values)) case relation: CatalogRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index d6c4b97ebd..db0254f8d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -54,8 +54,9 @@ class CatalogFileIndex( override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq - override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { - filterPartitions(filters).listFiles(Nil) + override def listFiles( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + filterPartitions(partitionFilters).listFiles(Nil, dataFilters) } override def refresh(): Unit = fileStatusCache.invalidateAll() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index 277223d52e..6b99d38fe5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -46,12 +46,17 @@ trait FileIndex { * Returns all valid files grouped into partitions when the data is partitioned. If the data is * unpartitioned, this will return a single partition with no partition values. * - * @param filters The filters used to prune which partitions are returned. These filters must - * only refer to partition columns and this method will only return files - * where these predicates are guaranteed to evaluate to `true`. Thus, these - * filters will not need to be evaluated again on the returned data. + * @param partitionFilters The filters used to prune which partitions are returned. These filters + * must only refer to partition columns and this method will only return + * files where these predicates are guaranteed to evaluate to `true`. + * Thus, these filters will not need to be evaluated again on the + * returned data. + * @param dataFilters Filters that can be applied on non-partitioned columns. The implementation + * does not need to guarantee these filters are applied, i.e. the execution + * engine will ensure these filters are still applied on the returned files. */ - def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] + def listFiles( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] /** * Returns the list of files that will be read when scanning this relation. This call may be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 26e1380eca..17f7e0e601 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -100,9 +100,6 @@ object FileSourceStrategy extends Strategy with Logging { val outputSchema = readDataColumns.toStructType logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}") - val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) - logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - val outputAttributes = readDataColumns ++ partitionColumns val scan = @@ -111,7 +108,7 @@ object FileSourceStrategy extends Strategy with Logging { outputAttributes, outputSchema, partitionKeyFilters.toSeq, - pushedDownFilters, + dataFilters, table.map(_.identifier)) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index db8bbc52aa..71500a0105 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -54,17 +54,19 @@ abstract class PartitioningAwareFileIndex( override def partitionSchema: StructType = partitionSpec().partitionColumns - protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + protected val hadoopConf: Configuration = + sparkSession.sessionState.newHadoopConfWithOptions(parameters) protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] - override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { + override def listFiles( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil } else { - prunePartitions(filters, partitionSpec()).map { + prunePartitions(partitionFilters, partitionSpec()).map { case PartitionPath(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => -- cgit v1.2.3