aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-11-28 21:58:01 -0800
committerReynold Xin <rxin@databricks.com>2016-11-28 21:58:01 -0800
commite2318ede04fa7a756d1c8151775e1f2406a176ca (patch)
tree68c6e8b8e6c0061428e063478fcd7aa51d4704df /sql/core/src/main
parentd449988b8819775fcfd27da53bb5143a7aab01f7 (diff)
downloadspark-e2318ede04fa7a756d1c8151775e1f2406a176ca.tar.gz
spark-e2318ede04fa7a756d1c8151775e1f2406a176ca.tar.bz2
spark-e2318ede04fa7a756d1c8151775e1f2406a176ca.zip
[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location
## What changes were proposed in this pull request? We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location. ## How was this patch tested? Unit test that fails before the patch. Author: Eric Liang <ekl@databricks.com> Closes #15983 from ericl/spark-18544.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala3
2 files changed, 15 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2d863422fb..8294e4149b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")
case _ =>
- val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
- val tableType = if (storage.locationUri.isDefined) {
+ val existingTable = if (tableExists) {
+ Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
+ } else {
+ None
+ }
+ val storage = if (tableExists) {
+ existingTable.get.storage
+ } else {
+ DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+ }
+ val tableType = if (tableExists) {
+ existingTable.get.tableType
+ } else if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
@@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
)
df.sparkSession.sessionState.executePlan(
CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
- if (tableDesc.partitionColumnNames.nonEmpty &&
- df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
- // Need to recover partitions into the metastore so our saved data is visible.
- df.sparkSession.sessionState.executePlan(
- AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd
- }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index add732c1af..422700c891 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand(
className = provider,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
- options = table.storage.properties ++ pathOption)
+ options = table.storage.properties ++ pathOption,
+ catalogTable = Some(table))
val result = try {
dataSource.write(mode, df)