diff options
author | Eric Liang <ekl@databricks.com> | 2016-11-28 21:58:01 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-11-28 21:58:01 -0800 |
commit | e2318ede04fa7a756d1c8151775e1f2406a176ca (patch) | |
tree | 68c6e8b8e6c0061428e063478fcd7aa51d4704df /sql/core/src/main | |
parent | d449988b8819775fcfd27da53bb5143a7aab01f7 (diff) | |
download | spark-e2318ede04fa7a756d1c8151775e1f2406a176ca.tar.gz spark-e2318ede04fa7a756d1c8151775e1f2406a176ca.tar.bz2 spark-e2318ede04fa7a756d1c8151775e1f2406a176ca.zip |
[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location
## What changes were proposed in this pull request?
We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location.
## How was this patch tested?
Unit test that fails before the patch.
Author: Eric Liang <ekl@databricks.com>
Closes #15983 from ericl/spark-18544.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 21 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala | 3 |
2 files changed, 15 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2d863422fb..8294e4149b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => - val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) - val tableType = if (storage.locationUri.isDefined) { + val existingTable = if (tableExists) { + Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) + } else { + None + } + val storage = if (tableExists) { + existingTable.get.storage + } else { + DataSource.buildStorageFormatFromOptions(extraOptions.toMap) + } + val tableType = if (tableExists) { + existingTable.get.tableType + } else if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) df.sparkSession.sessionState.executePlan( CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd - if (tableDesc.partitionColumnNames.nonEmpty && - df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { - // Need to recover partitions into the metastore so our saved data is visible. - df.sparkSession.sessionState.executePlan( - AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd - } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index add732c1af..422700c891 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand( className = provider, partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption) + options = table.storage.properties ++ pathOption, + catalogTable = Some(table)) val result = try { dataSource.write(mode, df) |