aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2017-03-16 18:31:57 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-16 18:31:57 -0700
commit8537c00e0a17eff2a8c6745fbdd1d08873c0434d (patch)
tree79e01f299fa6d76bd87f79315e2a18a86a685880 /sql/core/src/main/scala/org/apache
parent4c3200546c5c55e671988a957011417ba76a0600 (diff)
downloadspark-8537c00e0a17eff2a8c6745fbdd1d08873c0434d.tar.gz
spark-8537c00e0a17eff2a8c6745fbdd1d08873c0434d.tar.bz2
spark-8537c00e0a17eff2a8c6745fbdd1d08873c0434d.zip
[SPARK-19987][SQL] Pass all filters into FileIndex
## What changes were proposed in this pull request? This is a tiny teeny refactoring to pass data filters also to the FileIndex, so FileIndex can have a more global view on predicates. ## How was this patch tested? Change should be covered by existing test cases. Author: Reynold Xin <rxin@databricks.com> Closes #17322 from rxin/SPARK-19987.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala8
6 files changed, 34 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 8ebad676ca..bfe9c8e351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -23,18 +23,18 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
@@ -135,7 +135,7 @@ case class RowDataSourceScanExec(
* @param output Output attributes of the scan.
* @param outputSchema Output schema of the scan.
* @param partitionFilters Predicates to use for partition pruning.
- * @param dataFilters Data source filters to use for filtering data within partitions.
+ * @param dataFilters Filters on non-partition columns.
* @param metastoreTableIdentifier identifier for the table in the metastore.
*/
case class FileSourceScanExec(
@@ -143,7 +143,7 @@ case class FileSourceScanExec(
output: Seq[Attribute],
outputSchema: StructType,
partitionFilters: Seq[Expression],
- dataFilters: Seq[Filter],
+ dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {
@@ -156,7 +156,8 @@ case class FileSourceScanExec(
false
}
- @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
+ @transient private lazy val selectedPartitions =
+ relation.location.listFiles(partitionFilters, dataFilters)
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -225,6 +226,10 @@ case class FileSourceScanExec(
}
}
+ @transient
+ private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
+ logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
+
// These metadata values make scan plans uniquely identifiable for equality checking.
override val metadata: Map[String, String] = {
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
@@ -237,7 +242,7 @@ case class FileSourceScanExec(
"ReadSchema" -> outputSchema.catalogString,
"Batched" -> supportsBatch.toString,
"PartitionFilters" -> seqToString(partitionFilters),
- "PushedFilters" -> seqToString(dataFilters),
+ "PushedFilters" -> seqToString(pushedDownFilters),
"Location" -> locationDesc)
val withOptPartitionCount =
relation.partitionSchemaOption.map { _ =>
@@ -255,7 +260,7 @@ case class FileSourceScanExec(
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = outputSchema,
- filters = dataFilters,
+ filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 769deb1890..3c046ce494 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -98,7 +98,7 @@ case class OptimizeMetadataOnlyQuery(
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
- val partitionData = fsRelation.location.listFiles(filters = Nil)
+ val partitionData = fsRelation.location.listFiles(Nil, Nil)
LocalRelation(partAttrs, partitionData.map(_.values))
case relation: CatalogRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index d6c4b97ebd..db0254f8d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -54,8 +54,9 @@ class CatalogFileIndex(
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
- override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
- filterPartitions(filters).listFiles(Nil)
+ override def listFiles(
+ partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+ filterPartitions(partitionFilters).listFiles(Nil, dataFilters)
}
override def refresh(): Unit = fileStatusCache.invalidateAll()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 277223d52e..6b99d38fe5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -46,12 +46,17 @@ trait FileIndex {
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
* unpartitioned, this will return a single partition with no partition values.
*
- * @param filters The filters used to prune which partitions are returned. These filters must
- * only refer to partition columns and this method will only return files
- * where these predicates are guaranteed to evaluate to `true`. Thus, these
- * filters will not need to be evaluated again on the returned data.
+ * @param partitionFilters The filters used to prune which partitions are returned. These filters
+ * must only refer to partition columns and this method will only return
+ * files where these predicates are guaranteed to evaluate to `true`.
+ * Thus, these filters will not need to be evaluated again on the
+ * returned data.
+ * @param dataFilters Filters that can be applied on non-partitioned columns. The implementation
+ * does not need to guarantee these filters are applied, i.e. the execution
+ * engine will ensure these filters are still applied on the returned files.
*/
- def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+ def listFiles(
+ partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
/**
* Returns the list of files that will be read when scanning this relation. This call may be
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 26e1380eca..17f7e0e601 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -100,9 +100,6 @@ object FileSourceStrategy extends Strategy with Logging {
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
- val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
- logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
-
val outputAttributes = readDataColumns ++ partitionColumns
val scan =
@@ -111,7 +108,7 @@ object FileSourceStrategy extends Strategy with Logging {
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
- pushedDownFilters,
+ dataFilters,
table.map(_.identifier))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index db8bbc52aa..71500a0105 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -54,17 +54,19 @@ abstract class PartitioningAwareFileIndex(
override def partitionSchema: StructType = partitionSpec().partitionColumns
- protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+ protected val hadoopConf: Configuration =
+ sparkSession.sessionState.newHadoopConfWithOptions(parameters)
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
- override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+ override def listFiles(
+ partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
} else {
- prunePartitions(filters, partitionSpec()).map {
+ prunePartitions(partitionFilters, partitionSpec()).map {
case PartitionPath(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>