From 418e5e4cbdaab87addb91ac0bb2245ff0213ac81 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 27 Sep 2015 09:08:38 -0700 Subject: [SPARK-10741] [SQL] Hive Query Having/OrderBy against Parquet table is not working https://issues.apache.org/jira/browse/SPARK-10741 I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation Author: Wenchen Fan Closes #8889 from cloud-fan/hot-bug. --- .../catalyst/analysis/MultiInstanceRelation.scala | 1 - .../catalyst/expressions/namedExpressions.scala | 8 +++ .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../execution/datasources/DataSourceStrategy.scala | 18 +++--- .../execution/datasources/LogicalRelation.scala | 33 ++++++++--- .../spark/sql/execution/datasources/rules.scala | 14 ++--- .../datasources/parquet/ParquetFilterSuite.scala | 2 +- .../parquet/ParquetPartitionDiscoverySuite.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 64 ++++++---------------- .../apache/spark/sql/hive/execution/commands.scala | 2 +- .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 4 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 27 ++++++++- .../org/apache/spark/sql/hive/parquetSuites.scala | 8 +-- .../spark/sql/sources/hadoopFsRelationSuites.scala | 2 +- 15 files changed, 103 insertions(+), 86 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala index 35b74024a4..394be47a58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 6f173b52ad..5768c6087d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -236,6 +236,14 @@ case class AttributeReference( } } + def withExprId(newExprId: ExprId): AttributeReference = { + if (exprId == newExprId) { + this + } else { + AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifiers) + } + } + override def toString: String = s"$name#${exprId.id}$typeSuffix" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f9995da3a8..9c67ad18c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1595,7 +1595,7 @@ class DataFrame private[sql]( */ def inputFiles: Array[String] = { val files: Seq[String] = logicalPlan.collect { - case LogicalRelation(fsBasedRelation: FileRelation) => + case LogicalRelation(fsBasedRelation: FileRelation, _) => fsBasedRelation.inputFiles case fr: FileRelation => fr.inputFiles diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c58213155d..918db8e7d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -38,21 +38,21 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} */ private[sql] object DataSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _)) => pruneFilterProjectRaw( l, projects, filters, (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) => pruneFilterProject( l, projects, filters, (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _)) => pruneFilterProject( l, projects, @@ -60,7 +60,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil // Scanning partitioned HadoopFsRelation - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) if t.partitionSpec.partitionColumns.nonEmpty => val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray @@ -88,7 +88,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { selectedPartitions) :: Nil // Scanning non-partitioned HadoopFsRelation - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) => // See buildPartitionedTableScan for the reason that we need to create a shard // broadcast HadoopConf. val sharedHadoopConf = SparkHadoopUtil.get.conf @@ -101,16 +101,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil - case l @ LogicalRelation(baseRelation: TableScan) => + case l @ LogicalRelation(baseRelation: TableScan, _) => execution.PhysicalRDD.createFromDataSource( l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty => + case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _), + part, query, overwrite, false) if part.isEmpty => execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) => + l @ LogicalRelation(t: HadoopFsRelation, _), part, query, overwrite, false) => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index a7123dc845..4069179aa7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -17,23 +17,40 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation /** * Used to link a [[BaseRelation]] in to a logical query plan. + * + * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without + * changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for + * this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details. */ -private[sql] case class LogicalRelation(relation: BaseRelation) - extends LeafNode - with MultiInstanceRelation { +private[sql] case class LogicalRelation( + relation: BaseRelation, + expectedOutputAttributes: Option[Seq[Attribute]] = None) + extends LeafNode with MultiInstanceRelation { - override val output: Seq[AttributeReference] = relation.schema.toAttributes + override val output: Seq[AttributeReference] = { + val attrs = relation.schema.toAttributes + expectedOutputAttributes.map { expectedAttrs => + assert(expectedAttrs.length == attrs.length) + attrs.zip(expectedAttrs).map { + // We should respect the attribute names provided by base relation and only use the + // exprId in `expectedOutputAttributes`. + // The reason is that, some relations(like parquet) will reconcile attribute names to + // workaround case insensitivity issue. + case (attr, expected) => attr.withExprId(expected.exprId) + } + }.getOrElse(attrs) + } // Logical Relations are distinct if they have different output for the sake of transformations. override def equals(other: Any): Boolean = other match { - case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output - case _ => false + case l @ LogicalRelation(otherRelation, _) => relation == otherRelation && output == l.output + case _ => false } override def hashCode: Int = { @@ -41,7 +58,7 @@ private[sql] case class LogicalRelation(relation: BaseRelation) } override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { - case LogicalRelation(otherRelation) => relation == otherRelation + case LogicalRelation(otherRelation, _) => relation == otherRelation case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 16c9138419..8efc8016f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -37,7 +37,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { // We are inserting into an InsertableRelation or HadoopFsRelation. case i @ InsertIntoTable( - l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => { + l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _), _, child, _, _) => { // First, make sure the data to be inserted have the same number of fields with the // schema of the relation. if (l.output.size != child.output.size) { @@ -84,14 +84,14 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => def apply(plan: LogicalPlan): Unit = { plan.foreach { case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) => + l @ LogicalRelation(t: InsertableRelation, _), partition, query, overwrite, ifNotExists) => // Right now, we do not support insert into a data source table with partition specs. if (partition.nonEmpty) { failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") } else { // Get all input data source relations of the query. val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation) => src + case LogicalRelation(src: BaseRelation, _) => src } if (srcRelations.contains(t)) { failAnalysis( @@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } case logical.InsertIntoTable( - LogicalRelation(r: HadoopFsRelation), part, query, overwrite, _) => + LogicalRelation(r: HadoopFsRelation, _), part, query, overwrite, _) => // We need to make sure the partition columns specified by users do match partition // columns of the relation. val existingPartitionColumns = r.partitionColumns.fieldNames.toSet @@ -120,7 +120,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // Get all input data source relations of the query. val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation) => src + case LogicalRelation(src: BaseRelation, _) => src } if (srcRelations.contains(r)) { failAnalysis( @@ -148,10 +148,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). - case l @ LogicalRelation(dest: BaseRelation) => + case l @ LogicalRelation(dest: BaseRelation, _) => // Get all input data source relations of the query. val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation) => src + case LogicalRelation(src: BaseRelation, _) => src } if (srcRelations.contains(dest)) { failAnalysis( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index f067112cfc..45ad3fde55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -55,7 +55,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex .where(Column(predicate)) val analyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters + case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters }.flatten assert(analyzedPredicate.nonEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 7bac8609e1..3a23b8ed66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -465,7 +465,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: ParquetRelation) => + case LogicalRelation(relation: ParquetRelation, _) => assert(relation.partitionSpec === PartitionSpec.emptySpec) }.getOrElse { fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 012634cb5a..ea1521a48c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -448,7 +448,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical @ LogicalRelation(parquetRelation: ParquetRelation) => + case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = @@ -514,7 +514,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive parquetRelation } - result.newInstance() + result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { @@ -553,60 +553,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive return plan } - // Collects all `MetastoreRelation`s which should be replaced - val toBeReplaced = plan.collect { + plan transformUp { // Write path - case InsertIntoTable(relation: MetastoreRelation, _, _, _, _) - // Inserting into partitioned table is not supported in Parquet data source (yet). - if !relation.hiveQlTable.isPartitioned && - hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(relation) - val attributedRewrites = relation.output.zip(parquetRelation.output) - (relation, parquetRelation, attributedRewrites) + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Parquet data source (yet). + if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && + r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + val parquetRelation = convertToParquetRelation(r) + InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) // Write path - case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _) + case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if !relation.hiveQlTable.isPartitioned && - hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(relation) - val attributedRewrites = relation.output.zip(parquetRelation.output) - (relation, parquetRelation, attributedRewrites) + if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && + r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + val parquetRelation = convertToParquetRelation(r) + InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) // Read path case relation: MetastoreRelation if hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => val parquetRelation = convertToParquetRelation(relation) - val attributedRewrites = relation.output.zip(parquetRelation.output) - (relation, parquetRelation, attributedRewrites) - } - - val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap - val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _)) - - // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes - // attribute IDs referenced in other nodes. - plan.transformUp { - case r: MetastoreRelation if relationMap.contains(r) => - val parquetRelation = relationMap(r) - val alias = r.alias.getOrElse(r.tableName) - Subquery(alias, parquetRelation) - - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - if relationMap.contains(r) => - val parquetRelation = relationMap(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) - - case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - if relationMap.contains(r) => - val parquetRelation = relationMap(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) - - case other => other.transformExpressions { - case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) - } + Subquery(relation.alias.getOrElse(relation.tableName), parquetRelation) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index d1699dd536..9f654eed57 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -235,7 +235,7 @@ case class CreateMetastoreDataSourceAsSelect( sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match { - case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) => + case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) => if (l.relation != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 80a61f82fd..81ee9ba71b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -81,9 +81,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef test("Double create fails when allowExisting = false") { sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)") - val message = intercept[QueryExecutionException] { + intercept[QueryExecutionException] { sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)") - }.getMessage + } } test("Double create does not fail when allowExisting = true") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index bf0db08490..d356538000 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -570,7 +570,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv Row(3) :: Row(4) :: Nil) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: ParquetRelation) => // OK + case LogicalRelation(p: ParquetRelation, _) => // OK case _ => fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation]}") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 71823e32ad..8c3f9ac202 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -263,7 +263,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) relation match { - case LogicalRelation(r: ParquetRelation) => + case LogicalRelation(r: ParquetRelation, _) => if (!isDataSourceParquet) { fail( s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + @@ -1223,4 +1223,29 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkAnswer(df, (0 until 5).map(i => Row(i + "#", i + "#"))) } + + test("SPARK-10741: Sort on Aggregate using parquet") { + withTable("test10741") { + withTempTable("src") { + Seq("a" -> 5, "a" -> 9, "b" -> 6).toDF().registerTempTable("src") + sql("CREATE TABLE test10741(c1 STRING, c2 INT) STORED AS PARQUET AS SELECT * FROM src") + } + + checkAnswer(sql( + """ + |SELECT c1, AVG(c2) AS c_avg + |FROM test10741 + |GROUP BY c1 + |HAVING (AVG(c2) > 5) ORDER BY c1 + """.stripMargin), Row("a", 7.0) :: Row("b", 6.0) :: Nil) + + checkAnswer(sql( + """ + |SELECT c1, AVG(c2) AS c_avg + |FROM test10741 + |GROUP BY c1 + |ORDER BY AVG(c2) + """.stripMargin), Row("b", 6.0) :: Row("a", 7.0) :: Nil) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 6842ec2b5e..7d8104f935 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -282,7 +282,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(_: ParquetRelation) => // OK + case LogicalRelation(_: ParquetRelation, _) => // OK case _ => fail( "test_parquet_ctas should be converted to " + s"${classOf[ParquetRelation].getCanonicalName }") @@ -369,7 +369,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { assertResult(2) { analyzed.collect { - case r@LogicalRelation(_: ParquetRelation) => r + case r @ LogicalRelation(_: ParquetRelation, _) => r }.size } } @@ -378,7 +378,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { def collectParquetRelation(df: DataFrame): ParquetRelation = { val plan = df.queryExecution.analyzed plan.collectFirst { - case LogicalRelation(r: ParquetRelation) => r + case LogicalRelation(r: ParquetRelation, _) => r }.getOrElse { fail(s"Expecting a ParquetRelation2, but got:\n$plan") } @@ -428,7 +428,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") - case logical @ LogicalRelation(parquetRelation: ParquetRelation) => // OK + case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index d7504936d9..42b9b3d634 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -499,7 +499,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } val actualPaths = df.queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: HadoopFsRelation) => + case LogicalRelation(relation: HadoopFsRelation, _) => relation.paths.toSet }.getOrElse { fail("Expect an FSBasedRelation, but none could be found") -- cgit v1.2.3