aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@163.com>2015-09-27 09:08:38 -0700
committerYin Huai <yhuai@databricks.com>2015-09-27 09:08:38 -0700
commit418e5e4cbdaab87addb91ac0bb2245ff0213ac81 (patch)
tree455d23d8a1e2e731e00b2f6039c6ed2a7137e0bc /sql/hive/src/main
parent299b439920f980cce4c4f4e4a8436a5145efeaa3 (diff)
downloadspark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.tar.gz
spark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.tar.bz2
spark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.zip
[SPARK-10741] [SQL] Hive Query Having/OrderBy against Parquet table is not working
https://issues.apache.org/jira/browse/SPARK-10741 I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation Author: Wenchen Fan <cloud0fan@163.com> Closes #8889 from cloud-fan/hot-bug.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala64
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
2 files changed, 17 insertions, 49 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 012634cb5a..ea1521a48c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -448,7 +448,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical @ LogicalRelation(parquetRelation: ParquetRelation) =>
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
@@ -514,7 +514,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
parquetRelation
}
- result.newInstance()
+ result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
@@ -553,60 +553,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
return plan
}
- // Collects all `MetastoreRelation`s which should be replaced
- val toBeReplaced = plan.collect {
+ plan transformUp {
// Write path
- case InsertIntoTable(relation: MetastoreRelation, _, _, _, _)
- // Inserting into partitioned table is not supported in Parquet data source (yet).
- if !relation.hiveQlTable.isPartitioned &&
- hive.convertMetastoreParquet &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- val parquetRelation = convertToParquetRelation(relation)
- val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Parquet data source (yet).
+ if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
+ r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ val parquetRelation = convertToParquetRelation(r)
+ InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
// Write path
- case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _)
+ case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
- if !relation.hiveQlTable.isPartitioned &&
- hive.convertMetastoreParquet &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- val parquetRelation = convertToParquetRelation(relation)
- val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
+ r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ val parquetRelation = convertToParquetRelation(r)
+ InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
// Read path
case relation: MetastoreRelation if hive.convertMetastoreParquet &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
- val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
- }
-
- val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
- val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
-
- // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
- // attribute IDs referenced in other nodes.
- plan.transformUp {
- case r: MetastoreRelation if relationMap.contains(r) =>
- val parquetRelation = relationMap(r)
- val alias = r.alias.getOrElse(r.tableName)
- Subquery(alias, parquetRelation)
-
- case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
- if relationMap.contains(r) =>
- val parquetRelation = relationMap(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
-
- case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
- if relationMap.contains(r) =>
- val parquetRelation = relationMap(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
-
- case other => other.transformExpressions {
- case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
- }
+ Subquery(relation.alias.getOrElse(relation.tableName), parquetRelation)
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index d1699dd536..9f654eed57 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -235,7 +235,7 @@ case class CreateMetastoreDataSourceAsSelect(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
- case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
+ case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +