From 8942353905c354c4ce31b0d1a44d33feb3dcf737 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 14 Jan 2017 10:53:33 -0800 Subject: [SPARK-19151][SQL] DataFrameWriter.saveAsTable support hive overwrite ## What changes were proposed in this pull request? After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support. This PR implement: DataFrameWriter.saveAsTable work with hive format with overwrite mode ## How was this patch tested? unit test added Author: windpiger Closes #16549 from windpiger/saveAsTableWithHiveOverwrite. --- .../org/apache/spark/sql/DataFrameWriter.scala | 15 +++++++++----- .../org/apache/spark/sql/hive/HiveStrategies.scala | 9 ++++---- .../spark/sql/hive/execution/HiveDDLSuite.scala | 24 ++++++++++++++++++---- 3 files changed, 34 insertions(+), 14 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 82331fdb9b..7fc03bd5ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} @@ -380,17 +380,22 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case (true, SaveMode.Overwrite) => - // Get all input data source relations of the query. + // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { case LogicalRelation(src: BaseRelation, _, _) => src + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) => + relation.catalogTable.identifier } EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match { - // Only do the check if the table is a data source table (the relation is a BaseRelation). - // TODO(cloud-fan): also check hive table relation here when we support overwrite mode - // for creating hive tables. + // check if the table is a data source table (the relation is a BaseRelation). case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") + // check hive table relation when overwrite mode + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) + && srcRelations.contains(relation.catalogTable.identifier) => + throw new AnalysisException( + s"Cannot overwrite table $tableName that is also being read from") case _ => // OK } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6d5cc5778a..d1f11e78b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -109,12 +109,11 @@ private[hive] trait HiveStrategies { table, partition, planLater(child), overwrite, ifNotExists) :: Nil case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => - // Currently we will never hit this branch, as SQL string API can only use `Ignore` or - // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde - // tables yet. - if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { + // Currently `DataFrameWriter.saveAsTable` doesn't support + // the Append mode of hive serde tables yet. + if (mode == SaveMode.Append) { throw new AnalysisException( - "CTAS for hive serde tables does not support append or overwrite semantics.") + "CTAS for hive serde tables does not support append semantics.") } val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0af331e67b..e3f1667249 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1314,7 +1314,24 @@ class HiveDDLSuite .write.format("hive").option("fileFormat", "avro").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a")) - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + Seq("c" -> 1).toDF("i", "j").write.format("hive") + .mode(SaveMode.Overwrite).option("fileFormat", "parquet").saveAsTable("t") + checkAnswer(spark.table("t"), Row("c", 1)) + + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + + Seq(9 -> "x").toDF("i", "j") + .write.format("hive").mode(SaveMode.Overwrite).option("fileFormat", "avro").saveAsTable("t") + checkAnswer(spark.table("t"), Row(9, "x")) + + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(DDLUtils.isHiveTable(table)) assert(table.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat")) @@ -1324,7 +1341,7 @@ class HiveDDLSuite Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe")) sql("INSERT INTO t SELECT 2, 'b'") - checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) + checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil) val e = intercept[AnalysisException] { Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2") @@ -1340,8 +1357,7 @@ class HiveDDLSuite val e3 = intercept[AnalysisException] { spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t") } - assert(e3.message.contains( - "CTAS for hive serde tables does not support append or overwrite semantics")) + assert(e3.message.contains("Cannot overwrite table default.t that is also being read from")) } } -- cgit v1.2.3