aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-06-10 18:23:59 -0700
committerCheng Lian <lian@databricks.com>2016-06-10 18:23:59 -0700
commitbba5d7999f7b3ae9d816ea552ba9378fea1615a6 (patch)
treebf303da6296982bd924fb057a4c0a2306d6b68fa
parent54f758b5fc60ecb0da6b191939a72ef5829be38c (diff)
downloadspark-bba5d7999f7b3ae9d816ea552ba9378fea1615a6.tar.gz
spark-bba5d7999f7b3ae9d816ea552ba9378fea1615a6.tar.bz2
spark-bba5d7999f7b3ae9d816ea552ba9378fea1615a6.zip
[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader
## What changes were proposed in this pull request? The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader. ## How was this patch tested? Existing tests should be passed. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #13371 from viirya/vectorized-reader-push-down-filter.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala61
3 files changed, 21 insertions, 57 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 306a99d5a3..c06a1ea356 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -292,6 +292,14 @@ case class AttributeReference(
}
}
+ def withMetadata(newMetadata: Metadata): AttributeReference = {
+ if (metadata == newMetadata) {
+ this
+ } else {
+ AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated)
+ }
+ }
+
override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifier :: isGenerated :: Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 13a86bfb38..7fc842f83e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
- l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver)
+ l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c =>
+ files.dataSchema.find(_.name == c.name).map { f =>
+ c match {
+ case a: AttributeReference => a.withMetadata(f.metadata)
+ case _ => c
+ }
+ }.getOrElse(c)
+ }
// Partition keys are not available in the statistics of the files.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 3735c94968..bc4a9de7a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -357,6 +357,11 @@ private[sql] class ParquetFileFormat
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ pushed.foreach {
+ ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
+ }
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -592,62 +597,6 @@ private[sql] object ParquetFileFormat extends Logging {
}
}
- /** This closure sets various Parquet configurations at both driver side and executor side. */
- private[parquet] def initializeLocalJobFunc(
- requiredColumns: Array[String],
- filters: Array[Filter],
- dataSchema: StructType,
- parquetBlockSize: Long,
- useMetadataCache: Boolean,
- parquetFilterPushDown: Boolean,
- assumeBinaryIsString: Boolean,
- assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
- val conf = job.getConfiguration
- conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
-
- // Try to push down filters when filter push-down is enabled.
- if (parquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter(dataSchema, _))
- .reduceOption(FilterApi.and)
- .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
- }
-
- conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
- val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
- CatalystSchemaConverter.checkFieldNames(requestedSchema).json
- })
-
- conf.set(
- CatalystWriteSupport.SPARK_ROW_SCHEMA,
- CatalystSchemaConverter.checkFieldNames(dataSchema).json)
-
- // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
- conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
-
- // Sets flags for `CatalystSchemaConverter`
- conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
- conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
-
- overrideMinSplitSize(parquetBlockSize, conf)
- }
-
- /** This closure sets input paths at the driver side. */
- private[parquet] def initializeDriverSideJobFunc(
- inputFiles: Array[FileStatus],
- parquetBlockSize: Long)(job: Job): Unit = {
- // We side the input paths at the driver side.
- logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
- if (inputFiles.nonEmpty) {
- FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
- }
-
- overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
- }
-
private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {