diff options
Diffstat (limited to 'sql')
3 files changed, 122 insertions, 147 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index dfe6967647..06cd9ea2d2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import scala.Option; + import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; @@ -59,8 +61,12 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType$; +import org.apache.spark.util.AccumulatorV2; +import org.apache.spark.util.LongAccumulator; /** * Base class for custom RecordReaders for Parquet that directly materialize to `T`. @@ -145,6 +151,18 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo for (BlockMetaData block : blocks) { this.totalRowCount += block.getRowCount(); } + + // For test purpose. + // If the predefined accumulator exists, the row group number to read will be updated + // to the accumulator. So we can check if the row groups are filtered or not in test case. + TaskContext taskContext = TaskContext$.MODULE$.get(); + if (taskContext != null) { + Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>) taskContext.taskMetrics() + .lookForAccumulatorByName("numRowGroups"); + if (accu.isDefined()) { + ((LongAccumulator)accu.get()).add((long)blocks.size()); + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 612a295c0e..7794f31331 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -357,6 +358,11 @@ class ParquetFileFormat val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) @@ -563,87 +569,7 @@ private[parquet] class ParquetOutputWriter( override def close(): Unit = recordWriter.close(context) } - object ParquetFileFormat extends Logging { - /** - * If parquet's block size (row group size) setting is larger than the min split size, - * we use parquet's block size setting as the min split size. Otherwise, we will create - * tasks processing nothing (because a split does not cover the starting point of a - * parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for more information. - */ - private def overrideMinSplitSize(parquetBlockSize: Long, conf: Configuration): Unit = { - val minSplitSize = - math.max( - conf.getLong("mapred.min.split.size", 0L), - conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L)) - if (parquetBlockSize > minSplitSize) { - val message = - s"Parquet's block size (row group size) is larger than " + - s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting " + - s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to " + - s"$parquetBlockSize." - logDebug(message) - conf.set("mapred.min.split.size", parquetBlockSize.toString) - conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString) - } - } - - /** This closure sets various Parquet configurations at both driver side and executor side. */ - private[parquet] def initializeLocalJobFunc( - requiredColumns: Array[String], - filters: Array[Filter], - dataSchema: StructType, - parquetBlockSize: Long, - useMetadataCache: Boolean, - parquetFilterPushDown: Boolean, - assumeBinaryIsString: Boolean, - assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { - val conf = job.getConfiguration - conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) - - // Try to push down filters when filter push-down is enabled. - if (parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) - .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) - } - - conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - ParquetSchemaConverter.checkFieldNames(requestedSchema).json - }) - - conf.set( - ParquetWriteSupport.SPARK_ROW_SCHEMA, - ParquetSchemaConverter.checkFieldNames(dataSchema).json) - - // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata - conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) - - // Sets flags for `CatalystSchemaConverter` - conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) - conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) - - overrideMinSplitSize(parquetBlockSize, conf) - } - - /** This closure sets input paths at the driver side. */ - private[parquet] def initializeDriverSideJobFunc( - inputFiles: Array[FileStatus], - parquetBlockSize: Long)(job: Job): Unit = { - // We side the input paths at the driver side. - logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}") - if (inputFiles.nonEmpty) { - FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) - } - - overrideMinSplitSize(parquetBlockSize, job.getConfiguration) - } - private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index d846b27ffe..4246b54c21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.util.{AccumulatorContext, LongAccumulator} /** * A test suite that tests Parquet filter2 API based filter pushdown optimization. @@ -368,73 +369,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { import testImplicits._ - - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { - withTempPath { dir => - val pathOne = s"${dir.getCanonicalPath}/table1" - (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne) - val pathTwo = s"${dir.getCanonicalPath}/table2" - (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo) - - // If the "c = 1" filter gets pushed down, this query will throw an exception which - // Parquet emits. This is a Parquet issue (PARQUET-389). - val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") - checkAnswer( - df, - Row(1, "1", null)) - - // The fields "a" and "c" only exist in one Parquet file. - assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - - val pathThree = s"${dir.getCanonicalPath}/table3" - df.write.parquet(pathThree) - - // We will remove the temporary metadata when writing Parquet file. - val schema = spark.read.parquet(pathThree).schema - assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) - - val pathFour = s"${dir.getCanonicalPath}/table4" - val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") - dfStruct.select(struct("a").as("s")).write.parquet(pathFour) - - val pathFive = s"${dir.getCanonicalPath}/table5" - val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b") - dfStruct2.select(struct("c").as("s")).write.parquet(pathFive) - - // If the "s.c = 1" filter gets pushed down, this query will throw an exception which - // Parquet emits. - val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1") - .selectExpr("s") - checkAnswer(dfStruct3, Row(Row(null, 1))) - - // The fields "s.a" and "s.c" only exist in one Parquet file. - val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType] - assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - - val pathSix = s"${dir.getCanonicalPath}/table6" - dfStruct3.write.parquet(pathSix) - - // We will remove the temporary metadata when writing Parquet file. - val forPathSix = spark.read.parquet(pathSix).schema - assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) - - // sanity test: make sure optional metadata field is not wrongly set. - val pathSeven = s"${dir.getCanonicalPath}/table7" - (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven) - val pathEight = s"${dir.getCanonicalPath}/table8" - (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight) - - val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b") - checkAnswer( - df2, - Row(1, "1")) - - // The fields "a" and "b" exist in both two Parquet files. No metadata is set. - assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField)) - assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField)) + Seq("true", "false").map { vectorized => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + withTempPath { dir => + val pathOne = s"${dir.getCanonicalPath}/table1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne) + val pathTwo = s"${dir.getCanonicalPath}/table2" + (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo) + + // If the "c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. This is a Parquet issue (PARQUET-389). + val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") + checkAnswer( + df, + Row(1, "1", null)) + + // The fields "a" and "c" only exist in one Parquet file. + assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + + val pathThree = s"${dir.getCanonicalPath}/table3" + df.write.parquet(pathThree) + + // We will remove the temporary metadata when writing Parquet file. + val schema = spark.read.parquet(pathThree).schema + assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) + + val pathFour = s"${dir.getCanonicalPath}/table4" + val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") + dfStruct.select(struct("a").as("s")).write.parquet(pathFour) + + val pathFive = s"${dir.getCanonicalPath}/table5" + val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b") + dfStruct2.select(struct("c").as("s")).write.parquet(pathFive) + + // If the "s.c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. + val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1") + .selectExpr("s") + checkAnswer(dfStruct3, Row(Row(null, 1))) + + // The fields "s.a" and "s.c" only exist in one Parquet file. + val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType] + assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + + val pathSix = s"${dir.getCanonicalPath}/table6" + dfStruct3.write.parquet(pathSix) + + // We will remove the temporary metadata when writing Parquet file. + val forPathSix = spark.read.parquet(pathSix).schema + assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) + + // sanity test: make sure optional metadata field is not wrongly set. + val pathSeven = s"${dir.getCanonicalPath}/table7" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven) + val pathEight = s"${dir.getCanonicalPath}/table8" + (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight) + + val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b") + checkAnswer( + df2, + Row(1, "1")) + + // The fields "a" and "b" exist in both two Parquet files. No metadata is set. + assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField)) + assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField)) + } } } } @@ -527,4 +530,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(df.filter("_1 IS NOT NULL").count() === 4) } } + + test("Fiters should be pushed down for vectorized Parquet reader at row group level") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table" + (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path) + + Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) { + val accu = new LongAccumulator + accu.register(sparkContext, Some("numRowGroups")) + + val df = spark.read.parquet(path).filter("a < 100") + df.foreachPartition(_.foreach(v => accu.add(0))) + df.collect + + val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups") + assert(numRowGroups.isDefined) + assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value)) + AccumulatorContext.remove(accu.id) + } + } + } + } + } } |