diff options
author | windpiger <songjun@outlook.com> | 2017-01-14 10:53:33 -0800 |
---|---|---|
committer | gatorsmile <gatorsmile@gmail.com> | 2017-01-14 10:53:33 -0800 |
commit | 8942353905c354c4ce31b0d1a44d33feb3dcf737 (patch) | |
tree | 23281a909c8c44da8dc13eede62dcd18a97ba6f7 /sql/core | |
parent | b6a7aa4f770634e6db7244e88f8b6273fb9b6d1e (diff) | |
download | spark-8942353905c354c4ce31b0d1a44d33feb3dcf737.tar.gz spark-8942353905c354c4ce31b0d1a44d33feb3dcf737.tar.bz2 spark-8942353905c354c4ce31b0d1a44d33feb3dcf737.zip |
[SPARK-19151][SQL] DataFrameWriter.saveAsTable support hive overwrite
## What changes were proposed in this pull request?
After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support.
This PR implement:
DataFrameWriter.saveAsTable work with hive format with overwrite mode
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes #16549 from windpiger/saveAsTableWithHiveOverwrite.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 82331fdb9b..7fc03bd5ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} @@ -380,17 +380,22 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case (true, SaveMode.Overwrite) => - // Get all input data source relations of the query. + // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { case LogicalRelation(src: BaseRelation, _, _) => src + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) => + relation.catalogTable.identifier } EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match { - // Only do the check if the table is a data source table (the relation is a BaseRelation). - // TODO(cloud-fan): also check hive table relation here when we support overwrite mode - // for creating hive tables. + // check if the table is a data source table (the relation is a BaseRelation). case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") + // check hive table relation when overwrite mode + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) + && srcRelations.contains(relation.catalogTable.identifier) => + throw new AnalysisException( + s"Cannot overwrite table $tableName that is also being read from") case _ => // OK } |