aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-06-18 10:41:33 -0700
committerYin Huai <yhuai@databricks.com>2016-06-18 10:41:33 -0700
commitce3b98bae28af72299722f56e4e4ef831f471ec0 (patch)
tree09d4fb492d75939c561897dc15c174e96009a46c /sql
parent3d010c837582c23b5ddf65602213e3772b418f08 (diff)
downloadspark-ce3b98bae28af72299722f56e4e4ef831f471ec0.tar.gz
spark-ce3b98bae28af72299722f56e4e4ef831f471ec0.tar.bz2
spark-ce3b98bae28af72299722f56e4e4ef831f471ec0.zip
[SPARK-16034][SQL] Checks the partition columns when calling dataFrame.write.mode("append").saveAsTable
## What changes were proposed in this pull request? `DataFrameWriter` can be used to append data to existing data source tables. It becomes tricky when partition columns used in `DataFrameWriter.partitionBy(columns)` don't match the actual partition columns of the underlying table. This pull request enforces the check so that the partition columns of these two always match. ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #13749 from clockfly/SPARK-16034.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala39
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala24
3 files changed, 50 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 4918780873..c38eca5156 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand(
bucketSpec = bucketSpec,
options = optionsWithPath)
- val result = dataSource.write(mode, df)
-
+ val result = try {
+ dataSource.write(mode, df)
+ } catch {
+ case ex: AnalysisException =>
+ logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex)
+ throw ex
+ }
if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 7f3683fc98..f274fc77da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -435,26 +435,25 @@ case class DataSource(
// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
- val existingPartitionColumnSet = try {
- Some(
- resolveRelation()
- .asInstanceOf[HadoopFsRelation]
- .location
- .partitionSpec()
- .partitionColumns
- .fieldNames
- .toSet)
- } catch {
- case e: Exception =>
- None
- }
-
- existingPartitionColumnSet.foreach { ex =>
- if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) {
- throw new AnalysisException(
- s"Requested partitioning does not equal existing partitioning: " +
- s"$ex != ${partitionColumns.toSet}.")
- }
+ val existingColumns = Try {
+ resolveRelation()
+ .asInstanceOf[HadoopFsRelation]
+ .location
+ .partitionSpec()
+ .partitionColumns
+ .fieldNames
+ .toSeq
+ }.getOrElse(Seq.empty[String])
+ val sameColumns =
+ existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase)
+ if (existingColumns.size > 0 && !sameColumns) {
+ throw new AnalysisException(
+ s"""Requested partitioning does not match existing partitioning.
+ |Existing partitioning columns:
+ | ${existingColumns.mkString(", ")}
+ |Requested partitioning columns:
+ | ${partitionColumns.mkString(", ")}
+ |""".stripMargin)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 7eb2fff91d..8827649d0a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
}
+ test("SPARK-16034 Partition columns should match when appending to existing data source tables") {
+ import testImplicits._
+ val df = Seq((1, 2, 3)).toDF("a", "b", "c")
+ withTable("partitionedTable") {
+ df.write.mode("overwrite").partitionBy("a", "b").saveAsTable("partitionedTable")
+ // Misses some partition columns
+ intercept[AnalysisException] {
+ df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable")
+ }
+ // Wrong order
+ intercept[AnalysisException] {
+ df.write.mode("append").partitionBy("b", "a").saveAsTable("partitionedTable")
+ }
+ // Partition columns not specified
+ intercept[AnalysisException] {
+ df.write.mode("append").saveAsTable("partitionedTable")
+ }
+ assert(sql("select * from partitionedTable").collect().size == 1)
+ // Inserts new data successfully when partition columns are correctly specified in
+ // partitionBy(...).
+ df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable")
+ assert(sql("select * from partitionedTable").collect().size == 2)
+ }
+ }
}