aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-10-21 12:27:53 +0800
committerWenchen Fan <wenchen@databricks.com>2016-10-21 12:27:53 +0800
commit57e97fcbd6fe62af4acd60896feeacfa21efc222 (patch)
tree2e5a2459e7395a24fb13d9a75decbf8c53ef3f72
parent3180272d2d49e440516085c0e4aebd5bad18bcad (diff)
downloadspark-57e97fcbd6fe62af4acd60896feeacfa21efc222.tar.gz
spark-57e97fcbd6fe62af4acd60896feeacfa21efc222.tar.bz2
spark-57e97fcbd6fe62af4acd60896feeacfa21efc222.zip
[SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation
## What changes were proposed in this pull request? In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a pruned one. However, this replacement may change the output of the `LogicalRelation` if it doesn't have `expectedOutputAttributes`. This PR fixes it. ## How was this patch tested? the new `PruneFileSourcePartitionsSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #15569 from cloud-fan/partition-bug.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala74
5 files changed, 85 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 1a57a7707c..a97ed701c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -102,8 +102,8 @@ case class CatalogTablePartition(
* Given the partition schema, returns a row with that schema holding the partition values.
*/
def toRow(partitionSchema: StructType): InternalRow = {
- InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) =>
- Cast(Literal(spec(name)), dataType).eval()
+ InternalRow.fromSeq(partitionSchema.map { field =>
+ Cast(Literal(spec(field.name)), field.dataType).eval()
})
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 29121a47d9..8689017c3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -59,7 +59,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileCatalog)(sparkSession)
- val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
+ val prunedLogicalRelation = logicalRelation.copy(
+ relation = prunedFsRelation,
+ expectedOutputAttributes = Some(logicalRelation.output))
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
index f65e74de87..15523437a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.QueryTest
class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
test("table name with schema") {
@@ -78,7 +79,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
}
test("lazy partition pruning reads only necessary partition data") {
- withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") {
+ withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedTable("test", dir)
@@ -114,7 +115,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
}
test("all partitions read and cached when filesource partition pruning is off") {
- withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") {
+ withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
withTable("test") {
withTempDir { dir =>
setupPartitionedTable("test", dir)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 2ca1cd4c07..d290fe9962 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
/**
@@ -62,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
def testCaching(pruningEnabled: Boolean): Unit = {
test(s"partitioned table is cached when partition pruning is $pruningEnabled") {
- withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> pruningEnabled.toString) {
+ withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) {
withTable("test") {
withTempDir { dir =>
spark.range(5).selectExpr("id", "id as f1", "id as f2").write
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
new file mode 100644
index 0000000000..346ea0ca43
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
+
+class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) :: Nil
+ }
+
+ test("PruneFileSourcePartitions should not change the output of LogicalRelation") {
+ withTable("test") {
+ withTempDir { dir =>
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE test(i int)
+ |PARTITIONED BY (p int)
+ |STORED AS parquet
+ |LOCATION '${dir.getAbsolutePath}'""".stripMargin)
+
+ val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
+ val tableFileCatalog = new TableFileCatalog(
+ spark,
+ tableMeta.database,
+ tableMeta.identifier.table,
+ Some(tableMeta.partitionSchema),
+ 0)
+
+ val dataSchema = StructType(tableMeta.schema.filterNot { f =>
+ tableMeta.partitionColumnNames.contains(f.name)
+ })
+ val relation = HadoopFsRelation(
+ location = tableFileCatalog,
+ partitionSchema = tableMeta.partitionSchema,
+ dataSchema = dataSchema,
+ bucketSpec = None,
+ fileFormat = new ParquetFileFormat(),
+ options = Map.empty)(sparkSession = spark)
+
+ val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta))
+ val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze
+
+ val optimized = Optimize.execute(query)
+ assert(optimized.missingInput.isEmpty)
+ }
+ }
+ }
+}