diff options
author | Eric Liang <ekhliang@gmail.com> | 2016-10-30 20:27:38 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-10-30 20:27:38 +0800 |
commit | 3ad99f166494950665c137fd5dea636afa0feb10 (patch) | |
tree | 86720d5929bd3e36ee3cc81da7a5d30d870e665d /sql/core | |
parent | a489567e36e671cee290f8d69188837a8b1a75b3 (diff) | |
download | spark-3ad99f166494950665c137fd5dea636afa0feb10.tar.gz spark-3ad99f166494950665c137fd5dea636afa0feb10.tar.bz2 spark-3ad99f166494950665c137fd5dea636afa0feb10.zip |
[SPARK-18146][SQL] Avoid using Union to chain together create table and repair partition commands
## What changes were proposed in this pull request?
The behavior of union is not well defined here. It is safer to explicitly execute these commands in order. The other use of `Union` in this way will be removed by https://github.com/apache/spark/pull/15633
## How was this patch tested?
Existing tests.
cc yhuai cloud-fan
Author: Eric Liang <ekhliang@gmail.com>
Author: Eric Liang <ekl@databricks.com>
Closes #15665 from ericl/spark-18146.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7ff3522f54..11dd1df909 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -388,16 +388,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { partitionColumnNames = partitioningColumns.getOrElse(Nil), bucketSpec = getBucketSpec ) - val createCmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) - val cmd = if (tableDesc.partitionColumnNames.nonEmpty && + df.sparkSession.sessionState.executePlan( + CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd + if (tableDesc.partitionColumnNames.nonEmpty && df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { // Need to recover partitions into the metastore so our saved data is visible. - val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier) - Union(createCmd, recoverPartitionCmd) - } else { - createCmd + df.sparkSession.sessionState.executePlan( + AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd } - df.sparkSession.sessionState.executePlan(cmd).toRdd } } |