aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@163.com>2015-09-27 09:08:38 -0700
committerYin Huai <yhuai@databricks.com>2015-09-27 09:08:38 -0700
commit418e5e4cbdaab87addb91ac0bb2245ff0213ac81 (patch)
tree455d23d8a1e2e731e00b2f6039c6ed2a7137e0bc /sql
parent299b439920f980cce4c4f4e4a8436a5145efeaa3 (diff)
downloadspark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.tar.gz
spark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.tar.bz2
spark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.zip
[SPARK-10741] [SQL] Hive Query Having/OrderBy against Parquet table is not working
https://issues.apache.org/jira/browse/SPARK-10741 I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation Author: Wenchen Fan <cloud0fan@163.com> Closes #8889 from cloud-fan/hot-bug.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala64
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala27
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala2
15 files changed, 103 insertions, 86 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index 35b74024a4..394be47a58 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 6f173b52ad..5768c6087d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -236,6 +236,14 @@ case class AttributeReference(
}
}
+ def withExprId(newExprId: ExprId): AttributeReference = {
+ if (exprId == newExprId) {
+ this
+ } else {
+ AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifiers)
+ }
+ }
+
override def toString: String = s"$name#${exprId.id}$typeSuffix"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f9995da3a8..9c67ad18c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1595,7 +1595,7 @@ class DataFrame private[sql](
*/
def inputFiles: Array[String] = {
val files: Seq[String] = logicalPlan.collect {
- case LogicalRelation(fsBasedRelation: FileRelation) =>
+ case LogicalRelation(fsBasedRelation: FileRelation, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index c58213155d..918db8e7d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -38,21 +38,21 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
*/
private[sql] object DataSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _)) =>
pruneFilterProject(
l,
projects,
@@ -60,7 +60,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
// Scanning partitioned HadoopFsRelation
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
@@ -88,7 +88,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
selectedPartitions) :: Nil
// Scanning non-partitioned HadoopFsRelation
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
// See buildPartitionedTableScan for the reason that we need to create a shard
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
@@ -101,16 +101,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, f) =>
toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
- case l @ LogicalRelation(baseRelation: TableScan) =>
+ case l @ LogicalRelation(baseRelation: TableScan, _) =>
execution.PhysicalRDD.createFromDataSource(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
- case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
+ case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _),
+ part, query, overwrite, false) if part.isEmpty =>
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
+ l @ LogicalRelation(t: HadoopFsRelation, _), part, query, overwrite, false) =>
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index a7123dc845..4069179aa7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -17,23 +17,40 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
/**
* Used to link a [[BaseRelation]] in to a logical query plan.
+ *
+ * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
+ * changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for
+ * this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
*/
-private[sql] case class LogicalRelation(relation: BaseRelation)
- extends LeafNode
- with MultiInstanceRelation {
+private[sql] case class LogicalRelation(
+ relation: BaseRelation,
+ expectedOutputAttributes: Option[Seq[Attribute]] = None)
+ extends LeafNode with MultiInstanceRelation {
- override val output: Seq[AttributeReference] = relation.schema.toAttributes
+ override val output: Seq[AttributeReference] = {
+ val attrs = relation.schema.toAttributes
+ expectedOutputAttributes.map { expectedAttrs =>
+ assert(expectedAttrs.length == attrs.length)
+ attrs.zip(expectedAttrs).map {
+ // We should respect the attribute names provided by base relation and only use the
+ // exprId in `expectedOutputAttributes`.
+ // The reason is that, some relations(like parquet) will reconcile attribute names to
+ // workaround case insensitivity issue.
+ case (attr, expected) => attr.withExprId(expected.exprId)
+ }
+ }.getOrElse(attrs)
+ }
// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any): Boolean = other match {
- case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output
- case _ => false
+ case l @ LogicalRelation(otherRelation, _) => relation == otherRelation && output == l.output
+ case _ => false
}
override def hashCode: Int = {
@@ -41,7 +58,7 @@ private[sql] case class LogicalRelation(relation: BaseRelation)
}
override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
- case LogicalRelation(otherRelation) => relation == otherRelation
+ case LogicalRelation(otherRelation, _) => relation == otherRelation
case _ => false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 16c9138419..8efc8016f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -37,7 +37,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
// We are inserting into an InsertableRelation or HadoopFsRelation.
case i @ InsertIntoTable(
- l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
+ l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _), _, child, _, _) => {
// First, make sure the data to be inserted have the same number of fields with the
// schema of the relation.
if (l.output.size != child.output.size) {
@@ -84,14 +84,14 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
+ l @ LogicalRelation(t: InsertableRelation, _), partition, query, overwrite, ifNotExists) =>
// Right now, we do not support insert into a data source table with partition specs.
if (partition.nonEmpty) {
failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
} else {
// Get all input data source relations of the query.
val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation) => src
+ case LogicalRelation(src: BaseRelation, _) => src
}
if (srcRelations.contains(t)) {
failAnalysis(
@@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
case logical.InsertIntoTable(
- LogicalRelation(r: HadoopFsRelation), part, query, overwrite, _) =>
+ LogicalRelation(r: HadoopFsRelation, _), part, query, overwrite, _) =>
// We need to make sure the partition columns specified by users do match partition
// columns of the relation.
val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
@@ -120,7 +120,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation) => src
+ case LogicalRelation(src: BaseRelation, _) => src
}
if (srcRelations.contains(r)) {
failAnalysis(
@@ -148,10 +148,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
- case l @ LogicalRelation(dest: BaseRelation) =>
+ case l @ LogicalRelation(dest: BaseRelation, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation) => src
+ case LogicalRelation(src: BaseRelation, _) => src
}
if (srcRelations.contains(dest)) {
failAnalysis(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index f067112cfc..45ad3fde55 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -55,7 +55,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
.where(Column(predicate))
val analyzedPredicate = query.queryExecution.optimizedPlan.collect {
- case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters
+ case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters
}.flatten
assert(analyzedPredicate.nonEmpty)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 7bac8609e1..3a23b8ed66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -465,7 +465,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: ParquetRelation) =>
+ case LogicalRelation(relation: ParquetRelation, _) =>
assert(relation.partitionSpec === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 012634cb5a..ea1521a48c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -448,7 +448,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical @ LogicalRelation(parquetRelation: ParquetRelation) =>
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
@@ -514,7 +514,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
parquetRelation
}
- result.newInstance()
+ result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
@@ -553,60 +553,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
return plan
}
- // Collects all `MetastoreRelation`s which should be replaced
- val toBeReplaced = plan.collect {
+ plan transformUp {
// Write path
- case InsertIntoTable(relation: MetastoreRelation, _, _, _, _)
- // Inserting into partitioned table is not supported in Parquet data source (yet).
- if !relation.hiveQlTable.isPartitioned &&
- hive.convertMetastoreParquet &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- val parquetRelation = convertToParquetRelation(relation)
- val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Parquet data source (yet).
+ if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
+ r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ val parquetRelation = convertToParquetRelation(r)
+ InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
// Write path
- case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _)
+ case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
- if !relation.hiveQlTable.isPartitioned &&
- hive.convertMetastoreParquet &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- val parquetRelation = convertToParquetRelation(relation)
- val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
+ r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ val parquetRelation = convertToParquetRelation(r)
+ InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
// Read path
case relation: MetastoreRelation if hive.convertMetastoreParquet &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
- val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
- }
-
- val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
- val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
-
- // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
- // attribute IDs referenced in other nodes.
- plan.transformUp {
- case r: MetastoreRelation if relationMap.contains(r) =>
- val parquetRelation = relationMap(r)
- val alias = r.alias.getOrElse(r.tableName)
- Subquery(alias, parquetRelation)
-
- case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
- if relationMap.contains(r) =>
- val parquetRelation = relationMap(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
-
- case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
- if relationMap.contains(r) =>
- val parquetRelation = relationMap(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
-
- case other => other.transformExpressions {
- case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
- }
+ Subquery(relation.alias.getOrElse(relation.tableName), parquetRelation)
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index d1699dd536..9f654eed57 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -235,7 +235,7 @@ case class CreateMetastoreDataSourceAsSelect(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
- case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
+ case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 80a61f82fd..81ee9ba71b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -81,9 +81,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
test("Double create fails when allowExisting = false") {
sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
- val message = intercept[QueryExecutionException] {
+ intercept[QueryExecutionException] {
sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
- }.getMessage
+ }
}
test("Double create does not fail when allowExisting = true") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index bf0db08490..d356538000 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -570,7 +570,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
Row(3) :: Row(4) :: Nil)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation) => // OK
+ case LogicalRelation(p: ParquetRelation, _) => // OK
case _ =>
fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation]}")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 71823e32ad..8c3f9ac202 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -263,7 +263,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
- case LogicalRelation(r: ParquetRelation) =>
+ case LogicalRelation(r: ParquetRelation, _) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
@@ -1223,4 +1223,29 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkAnswer(df, (0 until 5).map(i => Row(i + "#", i + "#")))
}
+
+ test("SPARK-10741: Sort on Aggregate using parquet") {
+ withTable("test10741") {
+ withTempTable("src") {
+ Seq("a" -> 5, "a" -> 9, "b" -> 6).toDF().registerTempTable("src")
+ sql("CREATE TABLE test10741(c1 STRING, c2 INT) STORED AS PARQUET AS SELECT * FROM src")
+ }
+
+ checkAnswer(sql(
+ """
+ |SELECT c1, AVG(c2) AS c_avg
+ |FROM test10741
+ |GROUP BY c1
+ |HAVING (AVG(c2) > 5) ORDER BY c1
+ """.stripMargin), Row("a", 7.0) :: Row("b", 6.0) :: Nil)
+
+ checkAnswer(sql(
+ """
+ |SELECT c1, AVG(c2) AS c_avg
+ |FROM test10741
+ |GROUP BY c1
+ |ORDER BY AVG(c2)
+ """.stripMargin), Row("b", 6.0) :: Row("a", 7.0) :: Nil)
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 6842ec2b5e..7d8104f935 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -282,7 +282,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(_: ParquetRelation) => // OK
+ case LogicalRelation(_: ParquetRelation, _) => // OK
case _ => fail(
"test_parquet_ctas should be converted to " +
s"${classOf[ParquetRelation].getCanonicalName }")
@@ -369,7 +369,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
assertResult(2) {
analyzed.collect {
- case r@LogicalRelation(_: ParquetRelation) => r
+ case r @ LogicalRelation(_: ParquetRelation, _) => r
}.size
}
}
@@ -378,7 +378,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
def collectParquetRelation(df: DataFrame): ParquetRelation = {
val plan = df.queryExecution.analyzed
plan.collectFirst {
- case LogicalRelation(r: ParquetRelation) => r
+ case LogicalRelation(r: ParquetRelation, _) => r
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$plan")
}
@@ -428,7 +428,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
- case logical @ LogicalRelation(parquetRelation: ParquetRelation) => // OK
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index d7504936d9..42b9b3d634 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -499,7 +499,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
val actualPaths = df.queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: HadoopFsRelation) =>
+ case LogicalRelation(relation: HadoopFsRelation, _) =>
relation.paths.toSet
}.getOrElse {
fail("Expect an FSBasedRelation, but none could be found")