diff options
author | Cheng Lian <lian@databricks.com> | 2016-06-10 20:41:48 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-06-10 20:41:48 -0700 |
commit | 8e7b56f3d4917692d3ff44d91aa264738a6fc2ed (patch) | |
tree | 429202da591d53091c96f08d625ca9301cbb451b /sql | |
parent | 99f3c82776fe5ea4f89a9965a288c7447585dc2c (diff) | |
download | spark-8e7b56f3d4917692d3ff44d91aa264738a6fc2ed.tar.gz spark-8e7b56f3d4917692d3ff44d91aa264738a6fc2ed.tar.bz2 spark-8e7b56f3d4917692d3ff44d91aa264738a6fc2ed.zip |
Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader"
This reverts commit bba5d7999f7b3ae9d816ea552ba9378fea1615a6.
Diffstat (limited to 'sql')
3 files changed, 57 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index c06a1ea356..306a99d5a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -292,14 +292,6 @@ case class AttributeReference( } } - def withMetadata(newMetadata: Metadata): AttributeReference = { - if (metadata == newMetadata) { - this - } else { - AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated) - } - } - override protected final def otherCopyArgs: Seq[AnyRef] = { exprId :: qualifier :: isGenerated :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 7fc842f83e..13a86bfb38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -84,14 +84,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = - l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c => - files.dataSchema.find(_.name == c.name).map { f => - c match { - case a: AttributeReference => a.withMetadata(f.metadata) - case _ => c - } - }.getOrElse(c) - } + l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver) // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index bc4a9de7a9..3735c94968 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -357,11 +357,6 @@ private[sql] class ParquetFileFormat val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - pushed.foreach { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _) - } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) @@ -597,6 +592,62 @@ private[sql] object ParquetFileFormat extends Logging { } } + /** This closure sets various Parquet configurations at both driver side and executor side. */ + private[parquet] def initializeLocalJobFunc( + requiredColumns: Array[String], + filters: Array[Filter], + dataSchema: StructType, + parquetBlockSize: Long, + useMetadataCache: Boolean, + parquetFilterPushDown: Boolean, + assumeBinaryIsString: Boolean, + assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { + val conf = job.getConfiguration + conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + + // Try to push down filters when filter push-down is enabled. + if (parquetFilterPushDown) { + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(ParquetFilters.createFilter(dataSchema, _)) + .reduceOption(FilterApi.and) + .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) + } + + conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { + val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) + CatalystSchemaConverter.checkFieldNames(requestedSchema).json + }) + + conf.set( + CatalystWriteSupport.SPARK_ROW_SCHEMA, + CatalystSchemaConverter.checkFieldNames(dataSchema).json) + + // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata + conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) + + // Sets flags for `CatalystSchemaConverter` + conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) + conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) + + overrideMinSplitSize(parquetBlockSize, conf) + } + + /** This closure sets input paths at the driver side. */ + private[parquet] def initializeDriverSideJobFunc( + inputFiles: Array[FileStatus], + parquetBlockSize: Long)(job: Job): Unit = { + // We side the input paths at the driver side. + logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}") + if (inputFiles.nonEmpty) { + FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) + } + + overrideMinSplitSize(parquetBlockSize, job.getConfiguration) + } + private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { |