diff options
author | Cheng Lian <lian@databricks.com> | 2016-06-17 20:13:04 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-06-17 20:13:04 -0700 |
commit | 10b671447bc04af250cbd8a7ea86f2769147a78a (patch) | |
tree | ff5b99ba0ad1e95937a0c55bfc0ef4934d02c3b5 /sql/core | |
parent | ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8 (diff) | |
download | spark-10b671447bc04af250cbd8a7ea86f2769147a78a.tar.gz spark-10b671447bc04af250cbd8a7ea86f2769147a78a.tar.bz2 spark-10b671447bc04af250cbd8a7ea86f2769147a78a.zip |
[SPARK-16033][SQL] insertInto() can't be used together with partitionBy()
## What changes were proposed in this pull request?
When inserting into an existing partitioned table, partitioning columns should always be determined by catalog metadata of the existing table to be inserted. Extra `partitionBy()` calls don't make sense, and mess up existing data because newly inserted data may have wrong partitioning directory layout.
## How was this patch tested?
New test case added in `InsertIntoHiveTableSuite`.
Author: Cheng Lian <lian@databricks.com>
Closes #13747 from liancheng/spark-16033-insert-into-without-partition-by.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 60a9d1f020..e6fc9749c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,8 +21,6 @@ import java.util.Properties import scala.collection.JavaConverters._ -import org.apache.hadoop.fs.Path - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} @@ -243,7 +241,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def insertInto(tableIdent: TableIdentifier): Unit = { assertNotBucketed("insertInto") - val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap) + + if (partitioningColumns.isDefined) { + throw new AnalysisException( + "insertInto() can't be used together with partitionBy(). " + + "Partition columns are defined by the table into which is being inserted." + ) + } + + val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap) val overwrite = mode == SaveMode.Overwrite // A partitioned relation's schema can be different from the input logicalPlan, since |