aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala86
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala165
3 files changed, 122 insertions, 147 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index dfe6967647..06cd9ea2d2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import scala.Option;
+
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
@@ -59,8 +61,12 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.util.AccumulatorV2;
+import org.apache.spark.util.LongAccumulator;
/**
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
@@ -145,6 +151,18 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}
+
+ // For test purpose.
+ // If the predefined accumulator exists, the row group number to read will be updated
+ // to the accumulator. So we can check if the row groups are filtered or not in test case.
+ TaskContext taskContext = TaskContext$.MODULE$.get();
+ if (taskContext != null) {
+ Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>) taskContext.taskMetrics()
+ .lookForAccumulatorByName("numRowGroups");
+ if (accu.isDefined()) {
+ ((LongAccumulator)accu.get()).add((long)blocks.size());
+ }
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 612a295c0e..7794f31331 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -357,6 +358,11 @@ class ParquetFileFormat
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+ ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+ }
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -563,87 +569,7 @@ private[parquet] class ParquetOutputWriter(
override def close(): Unit = recordWriter.close(context)
}
-
object ParquetFileFormat extends Logging {
- /**
- * If parquet's block size (row group size) setting is larger than the min split size,
- * we use parquet's block size setting as the min split size. Otherwise, we will create
- * tasks processing nothing (because a split does not cover the starting point of a
- * parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for more information.
- */
- private def overrideMinSplitSize(parquetBlockSize: Long, conf: Configuration): Unit = {
- val minSplitSize =
- math.max(
- conf.getLong("mapred.min.split.size", 0L),
- conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L))
- if (parquetBlockSize > minSplitSize) {
- val message =
- s"Parquet's block size (row group size) is larger than " +
- s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting " +
- s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to " +
- s"$parquetBlockSize."
- logDebug(message)
- conf.set("mapred.min.split.size", parquetBlockSize.toString)
- conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString)
- }
- }
-
- /** This closure sets various Parquet configurations at both driver side and executor side. */
- private[parquet] def initializeLocalJobFunc(
- requiredColumns: Array[String],
- filters: Array[Filter],
- dataSchema: StructType,
- parquetBlockSize: Long,
- useMetadataCache: Boolean,
- parquetFilterPushDown: Boolean,
- assumeBinaryIsString: Boolean,
- assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
- val conf = job.getConfiguration
- conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-
- // Try to push down filters when filter push-down is enabled.
- if (parquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter(dataSchema, _))
- .reduceOption(FilterApi.and)
- .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
- }
-
- conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
- val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
- ParquetSchemaConverter.checkFieldNames(requestedSchema).json
- })
-
- conf.set(
- ParquetWriteSupport.SPARK_ROW_SCHEMA,
- ParquetSchemaConverter.checkFieldNames(dataSchema).json)
-
- // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
- conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
-
- // Sets flags for `CatalystSchemaConverter`
- conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
- conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
-
- overrideMinSplitSize(parquetBlockSize, conf)
- }
-
- /** This closure sets input paths at the driver side. */
- private[parquet] def initializeDriverSideJobFunc(
- inputFiles: Array[FileStatus],
- parquetBlockSize: Long)(job: Job): Unit = {
- // We side the input paths at the driver side.
- logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
- if (inputFiles.nonEmpty) {
- FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
- }
-
- overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
- }
-
private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d846b27ffe..4246b54c21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
+import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -368,73 +369,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._
-
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
- SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
- withTempPath { dir =>
- val pathOne = s"${dir.getCanonicalPath}/table1"
- (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
- val pathTwo = s"${dir.getCanonicalPath}/table2"
- (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
-
- // If the "c = 1" filter gets pushed down, this query will throw an exception which
- // Parquet emits. This is a Parquet issue (PARQUET-389).
- val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
- checkAnswer(
- df,
- Row(1, "1", null))
-
- // The fields "a" and "c" only exist in one Parquet file.
- assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
- assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
- val pathThree = s"${dir.getCanonicalPath}/table3"
- df.write.parquet(pathThree)
-
- // We will remove the temporary metadata when writing Parquet file.
- val schema = spark.read.parquet(pathThree).schema
- assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
- val pathFour = s"${dir.getCanonicalPath}/table4"
- val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
- dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
-
- val pathFive = s"${dir.getCanonicalPath}/table5"
- val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
- dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
-
- // If the "s.c = 1" filter gets pushed down, this query will throw an exception which
- // Parquet emits.
- val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
- .selectExpr("s")
- checkAnswer(dfStruct3, Row(Row(null, 1)))
-
- // The fields "s.a" and "s.c" only exist in one Parquet file.
- val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
- assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
- assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
- val pathSix = s"${dir.getCanonicalPath}/table6"
- dfStruct3.write.parquet(pathSix)
-
- // We will remove the temporary metadata when writing Parquet file.
- val forPathSix = spark.read.parquet(pathSix).schema
- assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
- // sanity test: make sure optional metadata field is not wrongly set.
- val pathSeven = s"${dir.getCanonicalPath}/table7"
- (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
- val pathEight = s"${dir.getCanonicalPath}/table8"
- (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
-
- val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
- checkAnswer(
- df2,
- Row(1, "1"))
-
- // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
- assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
- assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+ Seq("true", "false").map { vectorized =>
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+ SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ withTempPath { dir =>
+ val pathOne = s"${dir.getCanonicalPath}/table1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
+ val pathTwo = s"${dir.getCanonicalPath}/table2"
+ (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
+
+ // If the "c = 1" filter gets pushed down, this query will throw an exception which
+ // Parquet emits. This is a Parquet issue (PARQUET-389).
+ val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
+ checkAnswer(
+ df,
+ Row(1, "1", null))
+
+ // The fields "a" and "c" only exist in one Parquet file.
+ assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+ assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+ val pathThree = s"${dir.getCanonicalPath}/table3"
+ df.write.parquet(pathThree)
+
+ // We will remove the temporary metadata when writing Parquet file.
+ val schema = spark.read.parquet(pathThree).schema
+ assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+ val pathFour = s"${dir.getCanonicalPath}/table4"
+ val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
+ dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
+
+ val pathFive = s"${dir.getCanonicalPath}/table5"
+ val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
+ dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
+
+ // If the "s.c = 1" filter gets pushed down, this query will throw an exception which
+ // Parquet emits.
+ val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
+ .selectExpr("s")
+ checkAnswer(dfStruct3, Row(Row(null, 1)))
+
+ // The fields "s.a" and "s.c" only exist in one Parquet file.
+ val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
+ assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+ assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+ val pathSix = s"${dir.getCanonicalPath}/table6"
+ dfStruct3.write.parquet(pathSix)
+
+ // We will remove the temporary metadata when writing Parquet file.
+ val forPathSix = spark.read.parquet(pathSix).schema
+ assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+ // sanity test: make sure optional metadata field is not wrongly set.
+ val pathSeven = s"${dir.getCanonicalPath}/table7"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
+ val pathEight = s"${dir.getCanonicalPath}/table8"
+ (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
+
+ val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
+ checkAnswer(
+ df2,
+ Row(1, "1"))
+
+ // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
+ assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
+ assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+ }
}
}
}
@@ -527,4 +530,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.filter("_1 IS NOT NULL").count() === 4)
}
}
+
+ test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
+ import testImplicits._
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/table"
+ (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+ Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) =>
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
+ val accu = new LongAccumulator
+ accu.register(sparkContext, Some("numRowGroups"))
+
+ val df = spark.read.parquet(path).filter("a < 100")
+ df.foreachPartition(_.foreach(v => accu.add(0)))
+ df.collect
+
+ val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups")
+ assert(numRowGroups.isDefined)
+ assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value))
+ AccumulatorContext.remove(accu.id)
+ }
+ }
+ }
+ }
+ }
}