aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-01-14 10:53:33 -0800
committergatorsmile <gatorsmile@gmail.com>2017-01-14 10:53:33 -0800
commit8942353905c354c4ce31b0d1a44d33feb3dcf737 (patch)
tree23281a909c8c44da8dc13eede62dcd18a97ba6f7 /sql
parentb6a7aa4f770634e6db7244e88f8b6273fb9b6d1e (diff)
downloadspark-8942353905c354c4ce31b0d1a44d33feb3dcf737.tar.gz
spark-8942353905c354c4ce31b0d1a44d33feb3dcf737.tar.bz2
spark-8942353905c354c4ce31b0d1a44d33feb3dcf737.zip
[SPARK-19151][SQL] DataFrameWriter.saveAsTable support hive overwrite
## What changes were proposed in this pull request? After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support. This PR implement: DataFrameWriter.saveAsTable work with hive format with overwrite mode ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16549 from windpiger/saveAsTableWithHiveOverwrite.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala24
3 files changed, 34 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 82331fdb9b..7fc03bd5ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
@@ -380,17 +380,22 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")
case (true, SaveMode.Overwrite) =>
- // Get all input data source relations of the query.
+ // Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
+ case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
+ relation.catalogTable.identifier
}
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
- // Only do the check if the table is a data source table (the relation is a BaseRelation).
- // TODO(cloud-fan): also check hive table relation here when we support overwrite mode
- // for creating hive tables.
+ // check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
+ // check hive table relation when overwrite mode
+ case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable)
+ && srcRelations.contains(relation.catalogTable.identifier) =>
+ throw new AnalysisException(
+ s"Cannot overwrite table $tableName that is also being read from")
case _ => // OK
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 6d5cc5778a..d1f11e78b4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -109,12 +109,11 @@ private[hive] trait HiveStrategies {
table, partition, planLater(child), overwrite, ifNotExists) :: Nil
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
- // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
- // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
- // tables yet.
- if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+ // Currently `DataFrameWriter.saveAsTable` doesn't support
+ // the Append mode of hive serde tables yet.
+ if (mode == SaveMode.Append) {
throw new AnalysisException(
- "CTAS for hive serde tables does not support append or overwrite semantics.")
+ "CTAS for hive serde tables does not support append semantics.")
}
val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 0af331e67b..e3f1667249 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1314,7 +1314,24 @@ class HiveDDLSuite
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a"))
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ Seq("c" -> 1).toDF("i", "j").write.format("hive")
+ .mode(SaveMode.Overwrite).option("fileFormat", "parquet").saveAsTable("t")
+ checkAnswer(spark.table("t"), Row("c", 1))
+
+ var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(DDLUtils.isHiveTable(table))
+ assert(table.storage.inputFormat ==
+ Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+ assert(table.storage.outputFormat ==
+ Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+ assert(table.storage.serde ==
+ Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+
+ Seq(9 -> "x").toDF("i", "j")
+ .write.format("hive").mode(SaveMode.Overwrite).option("fileFormat", "avro").saveAsTable("t")
+ checkAnswer(spark.table("t"), Row(9, "x"))
+
+ table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.inputFormat ==
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
@@ -1324,7 +1341,7 @@ class HiveDDLSuite
Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
sql("INSERT INTO t SELECT 2, 'b'")
- checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
+ checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
val e = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
@@ -1340,8 +1357,7 @@ class HiveDDLSuite
val e3 = intercept[AnalysisException] {
spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
}
- assert(e3.message.contains(
- "CTAS for hive serde tables does not support append or overwrite semantics"))
+ assert(e3.message.contains("Cannot overwrite table default.t that is also being read from"))
}
}