aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-06-10 20:41:48 -0700
committerCheng Lian <lian@databricks.com>2016-06-10 20:41:48 -0700
commit8e7b56f3d4917692d3ff44d91aa264738a6fc2ed (patch)
tree429202da591d53091c96f08d625ca9301cbb451b /sql
parent99f3c82776fe5ea4f89a9965a288c7447585dc2c (diff)
downloadspark-8e7b56f3d4917692d3ff44d91aa264738a6fc2ed.tar.gz
spark-8e7b56f3d4917692d3ff44d91aa264738a6fc2ed.tar.bz2
spark-8e7b56f3d4917692d3ff44d91aa264738a6fc2ed.zip
Revert "[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader"
This reverts commit bba5d7999f7b3ae9d816ea552ba9378fea1615a6.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala61
3 files changed, 57 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index c06a1ea356..306a99d5a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -292,14 +292,6 @@ case class AttributeReference(
}
}
- def withMetadata(newMetadata: Metadata): AttributeReference = {
- if (metadata == newMetadata) {
- this
- } else {
- AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated)
- }
- }
-
override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifier :: isGenerated :: Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 7fc842f83e..13a86bfb38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -84,14 +84,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
- l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c =>
- files.dataSchema.find(_.name == c.name).map { f =>
- c match {
- case a: AttributeReference => a.withMetadata(f.metadata)
- case _ => c
- }
- }.getOrElse(c)
- }
+ l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver)
// Partition keys are not available in the statistics of the files.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index bc4a9de7a9..3735c94968 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -357,11 +357,6 @@ private[sql] class ParquetFileFormat
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
- // Try to push down filters when filter push-down is enabled.
- // Notice: This push-down is RowGroups level, not individual records.
- pushed.foreach {
- ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
- }
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -597,6 +592,62 @@ private[sql] object ParquetFileFormat extends Logging {
}
}
+ /** This closure sets various Parquet configurations at both driver side and executor side. */
+ private[parquet] def initializeLocalJobFunc(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ dataSchema: StructType,
+ parquetBlockSize: Long,
+ useMetadataCache: Boolean,
+ parquetFilterPushDown: Boolean,
+ assumeBinaryIsString: Boolean,
+ assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
+ val conf = job.getConfiguration
+ conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+
+ // Try to push down filters when filter push-down is enabled.
+ if (parquetFilterPushDown) {
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .reduceOption(FilterApi.and)
+ .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+ }
+
+ conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+ val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+ CatalystSchemaConverter.checkFieldNames(requestedSchema).json
+ })
+
+ conf.set(
+ CatalystWriteSupport.SPARK_ROW_SCHEMA,
+ CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+
+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+ conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
+
+ // Sets flags for `CatalystSchemaConverter`
+ conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
+ conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
+
+ overrideMinSplitSize(parquetBlockSize, conf)
+ }
+
+ /** This closure sets input paths at the driver side. */
+ private[parquet] def initializeDriverSideJobFunc(
+ inputFiles: Array[FileStatus],
+ parquetBlockSize: Long)(job: Job): Unit = {
+ // We side the input paths at the driver side.
+ logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
+ if (inputFiles.nonEmpty) {
+ FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
+ }
+
+ overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
+ }
+
private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {