diff options
author | Wenchen Fan <cloud0fan@163.com> | 2015-09-27 09:08:38 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-09-27 09:08:38 -0700 |
commit | 418e5e4cbdaab87addb91ac0bb2245ff0213ac81 (patch) | |
tree | 455d23d8a1e2e731e00b2f6039c6ed2a7137e0bc /sql/hive/src/main | |
parent | 299b439920f980cce4c4f4e4a8436a5145efeaa3 (diff) | |
download | spark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.tar.gz spark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.tar.bz2 spark-418e5e4cbdaab87addb91ac0bb2245ff0213ac81.zip |
[SPARK-10741] [SQL] Hive Query Having/OrderBy against Parquet table is not working
https://issues.apache.org/jira/browse/SPARK-10741
I choose the second approach: do not change output exprIds when convert MetastoreRelation to LogicalRelation
Author: Wenchen Fan <cloud0fan@163.com>
Closes #8889 from cloud-fan/hot-bug.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 64 | ||||
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala | 2 |
2 files changed, 17 insertions, 49 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 012634cb5a..ea1521a48c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -448,7 +448,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical @ LogicalRelation(parquetRelation: ParquetRelation) => + case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = @@ -514,7 +514,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive parquetRelation } - result.newInstance() + result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { @@ -553,60 +553,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive return plan } - // Collects all `MetastoreRelation`s which should be replaced - val toBeReplaced = plan.collect { + plan transformUp { // Write path - case InsertIntoTable(relation: MetastoreRelation, _, _, _, _) - // Inserting into partitioned table is not supported in Parquet data source (yet). - if !relation.hiveQlTable.isPartitioned && - hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(relation) - val attributedRewrites = relation.output.zip(parquetRelation.output) - (relation, parquetRelation, attributedRewrites) + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Parquet data source (yet). + if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && + r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + val parquetRelation = convertToParquetRelation(r) + InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) // Write path - case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _) + case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if !relation.hiveQlTable.isPartitioned && - hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(relation) - val attributedRewrites = relation.output.zip(parquetRelation.output) - (relation, parquetRelation, attributedRewrites) + if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && + r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + val parquetRelation = convertToParquetRelation(r) + InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) // Read path case relation: MetastoreRelation if hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => val parquetRelation = convertToParquetRelation(relation) - val attributedRewrites = relation.output.zip(parquetRelation.output) - (relation, parquetRelation, attributedRewrites) - } - - val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap - val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _)) - - // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes - // attribute IDs referenced in other nodes. - plan.transformUp { - case r: MetastoreRelation if relationMap.contains(r) => - val parquetRelation = relationMap(r) - val alias = r.alias.getOrElse(r.tableName) - Subquery(alias, parquetRelation) - - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - if relationMap.contains(r) => - val parquetRelation = relationMap(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) - - case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - if relationMap.contains(r) => - val parquetRelation = relationMap(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) - - case other => other.transformExpressions { - case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) - } + Subquery(relation.alias.getOrElse(relation.tableName), parquetRelation) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index d1699dd536..9f654eed57 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -235,7 +235,7 @@ case class CreateMetastoreDataSourceAsSelect( sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match { - case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) => + case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) => if (l.relation != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + |