diff options
author | Sean Zhong <seanzhong@databricks.com> | 2016-06-18 10:41:33 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-06-18 10:41:33 -0700 |
commit | ce3b98bae28af72299722f56e4e4ef831f471ec0 (patch) | |
tree | 09d4fb492d75939c561897dc15c174e96009a46c | |
parent | 3d010c837582c23b5ddf65602213e3772b418f08 (diff) | |
download | spark-ce3b98bae28af72299722f56e4e4ef831f471ec0.tar.gz spark-ce3b98bae28af72299722f56e4e4ef831f471ec0.tar.bz2 spark-ce3b98bae28af72299722f56e4e4ef831f471ec0.zip |
[SPARK-16034][SQL] Checks the partition columns when calling dataFrame.write.mode("append").saveAsTable
## What changes were proposed in this pull request?
`DataFrameWriter` can be used to append data to existing data source tables. It becomes tricky when partition columns used in `DataFrameWriter.partitionBy(columns)` don't match the actual partition columns of the underlying table. This pull request enforces the check so that the partition columns of these two always match.
## How was this patch tested?
Unit test.
Author: Sean Zhong <seanzhong@databricks.com>
Closes #13749 from clockfly/SPARK-16034.
3 files changed, 50 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 4918780873..c38eca5156 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand( bucketSpec = bucketSpec, options = optionsWithPath) - val result = dataSource.write(mode, df) - + val result = try { + dataSource.write(mode, df) + } catch { + case ex: AnalysisException => + logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex) + throw ex + } if (createMetastoreTable) { // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 7f3683fc98..f274fc77da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -435,26 +435,25 @@ case class DataSource( // If we are appending to a table that already exists, make sure the partitioning matches // up. If we fail to load the table for whatever reason, ignore the check. if (mode == SaveMode.Append) { - val existingPartitionColumnSet = try { - Some( - resolveRelation() - .asInstanceOf[HadoopFsRelation] - .location - .partitionSpec() - .partitionColumns - .fieldNames - .toSet) - } catch { - case e: Exception => - None - } - - existingPartitionColumnSet.foreach { ex => - if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) { - throw new AnalysisException( - s"Requested partitioning does not equal existing partitioning: " + - s"$ex != ${partitionColumns.toSet}.") - } + val existingColumns = Try { + resolveRelation() + .asInstanceOf[HadoopFsRelation] + .location + .partitionSpec() + .partitionColumns + .fieldNames + .toSeq + }.getOrElse(Seq.empty[String]) + val sameColumns = + existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase) + if (existingColumns.size > 0 && !sameColumns) { + throw new AnalysisException( + s"""Requested partitioning does not match existing partitioning. + |Existing partitioning columns: + | ${existingColumns.mkString(", ")} + |Requested partitioning columns: + | ${partitionColumns.mkString(", ")} + |""".stripMargin) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 7eb2fff91d..8827649d0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)") } + test("SPARK-16034 Partition columns should match when appending to existing data source tables") { + import testImplicits._ + val df = Seq((1, 2, 3)).toDF("a", "b", "c") + withTable("partitionedTable") { + df.write.mode("overwrite").partitionBy("a", "b").saveAsTable("partitionedTable") + // Misses some partition columns + intercept[AnalysisException] { + df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable") + } + // Wrong order + intercept[AnalysisException] { + df.write.mode("append").partitionBy("b", "a").saveAsTable("partitionedTable") + } + // Partition columns not specified + intercept[AnalysisException] { + df.write.mode("append").saveAsTable("partitionedTable") + } + assert(sql("select * from partitionedTable").collect().size == 1) + // Inserts new data successfully when partition columns are correctly specified in + // partitionBy(...). + df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable") + assert(sql("select * from partitionedTable").collect().size == 2) + } + } } |