From bba5d7999f7b3ae9d816ea552ba9378fea1615a6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 10 Jun 2016 18:23:59 -0700 Subject: [SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader ## What changes were proposed in this pull request? The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader. ## How was this patch tested? Existing tests should be passed. Author: Liang-Chi Hsieh Closes #13371 from viirya/vectorized-reader-push-down-filter. --- .../catalyst/expressions/namedExpressions.scala | 8 +++ .../execution/datasources/FileSourceStrategy.scala | 9 +++- .../datasources/parquet/ParquetFileFormat.scala | 61 ++-------------------- 3 files changed, 21 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 306a99d5a3..c06a1ea356 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,6 +292,14 @@ case class AttributeReference( } } + def withMetadata(newMetadata: Metadata): AttributeReference = { + if (metadata == newMetadata) { + this + } else { + AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) + } + } + override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 13a86bfb38..7fc842f83e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = - l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) + l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c => + files.dataSchema.find(_.name == c.name).map { f => + c match { + case a: AttributeReference => a.withMetadata(f.metadata) + case _ => c + } + }.getOrElse(c) + } // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 3735c94968..bc4a9de7a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -357,6 +357,11 @@ private[sql] class ParquetFileFormat val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + pushed.foreach { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _) + } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) @@ -592,62 +597,6 @@ private[sql] object ParquetFileFormat extends Logging { } } - /** This closure sets various Parquet configurations at both driver side and executor side. */ - private[parquet] def initializeLocalJobFunc( - requiredColumns: Array[String], - filters: Array[Filter], - dataSchema: StructType, - parquetBlockSize: Long, - useMetadataCache: Boolean, - parquetFilterPushDown: Boolean, - assumeBinaryIsString: Boolean, - assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { - val conf = job.getConfiguration - conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) - - // Try to push down filters when filter push-down is enabled. - if (parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) - .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) - } - - conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - CatalystSchemaConverter.checkFieldNames(requestedSchema).json - }) - - conf.set( - CatalystWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(dataSchema).json) - - // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata - conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) - - // Sets flags for `CatalystSchemaConverter` - conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) - conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) - - overrideMinSplitSize(parquetBlockSize, conf) - } - - /** This closure sets input paths at the driver side. */ - private[parquet] def initializeDriverSideJobFunc( - inputFiles: Array[FileStatus], - parquetBlockSize: Long)(job: Job): Unit = { - // We side the input paths at the driver side. - logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}") - if (inputFiles.nonEmpty) { - FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) - } - - overrideMinSplitSize(parquetBlockSize, job.getConfiguration) - } - private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { -- cgit v1.2.3