From 57e97fcbd6fe62af4acd60896feeacfa21efc222 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 21 Oct 2016 12:27:53 +0800 Subject: [SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation ## What changes were proposed in this pull request? In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a pruned one. However, this replacement may change the output of the `LogicalRelation` if it doesn't have `expectedOutputAttributes`. This PR fixes it. ## How was this patch tested? the new `PruneFileSourcePartitionsSuite` Author: Wenchen Fan Closes #15569 from cloud-fan/partition-bug. --- .../spark/sql/catalyst/catalog/interface.scala | 4 +- .../datasources/PruneFileSourcePartitions.scala | 4 +- .../apache/spark/sql/hive/HiveDataFrameSuite.scala | 7 +- .../spark/sql/hive/HiveMetadataCacheSuite.scala | 3 +- .../execution/PruneFileSourcePartitionsSuite.scala | 74 ++++++++++++++++++++++ 5 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 1a57a7707c..a97ed701c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -102,8 +102,8 @@ case class CatalogTablePartition( * Given the partition schema, returns a row with that schema holding the partition values. */ def toRow(partitionSchema: StructType): InternalRow = { - InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => - Cast(Literal(spec(name)), dataType).eval() + InternalRow.fromSeq(partitionSchema.map { field => + Cast(Literal(spec(field.name)), field.dataType).eval() }) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 29121a47d9..8689017c3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -59,7 +59,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileCatalog)(sparkSession) - val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) + val prunedLogicalRelation = logicalRelation.copy( + relation = prunedFsRelation, + expectedOutputAttributes = Some(logicalRelation.output)) // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala index f65e74de87..15523437a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.QueryTest class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { test("table name with schema") { @@ -78,7 +79,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt } test("lazy partition pruning reads only necessary partition data") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") { withTable("test") { withTempDir { dir => setupPartitionedTable("test", dir) @@ -114,7 +115,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt } test("all partitions read and cached when filesource partition pruning is off") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") { withTable("test") { withTempDir { dir => setupPartitionedTable("test", dir) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 2ca1cd4c07..d290fe9962 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils /** @@ -62,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi def testCaching(pruningEnabled: Boolean): Unit = { test(s"partitioned table is cached when partition pruning is $pruningEnabled") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> pruningEnabled.toString) { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) { withTable("test") { withTempDir { dir => spark.range(5).selectExpr("id", "id as f1", "id as f2").write diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala new file mode 100644 index 0000000000..346ea0ca43 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType + +class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) :: Nil + } + + test("PruneFileSourcePartitions should not change the output of LogicalRelation") { + withTable("test") { + withTempDir { dir => + sql( + s""" + |CREATE EXTERNAL TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet + |LOCATION '${dir.getAbsolutePath}'""".stripMargin) + + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val tableFileCatalog = new TableFileCatalog( + spark, + tableMeta.database, + tableMeta.identifier.table, + Some(tableMeta.partitionSchema), + 0) + + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = tableFileCatalog, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + + val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta)) + val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze + + val optimized = Optimize.execute(query) + assert(optimized.missingInput.isEmpty) + } + } + } +} -- cgit v1.2.3