aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-05-10 23:53:55 -0700
committerYin Huai <yhuai@databricks.com>2016-05-10 23:53:55 -0700
commit875ef764280428acd095aec1834fee0ddad08611 (patch)
tree82ccd9556e78d06e826403aacb366e5ba00f6c14
parent007882c7ee06de37ba309424fced1e4c6b408572 (diff)
downloadspark-875ef764280428acd095aec1834fee0ddad08611.tar.gz
spark-875ef764280428acd095aec1834fee0ddad08611.tar.bz2
spark-875ef764280428acd095aec1834fee0ddad08611.zip
[SPARK-15231][SQL] Document the semantic of saveAsTable and insertInto and don't drop columns silently
## What changes were proposed in this pull request? This PR adds documents about the different behaviors between `insertInto` and `saveAsTable`, and throws an exception when the user try to add too man columns using `saveAsTable with append`. ## How was this patch tested? Unit tests added in this PR. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13013 from zsxwing/SPARK-15231.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala43
3 files changed, 82 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index da9d25443e..a9e8329c4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -361,6 +361,23 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* Inserts the content of the [[DataFrame]] to the specified table. It requires that
* the schema of the [[DataFrame]] is the same as the schema of the table.
*
+ * Note: Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based
+ * resolution. For example:
+ *
+ * {{{
+ * scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
+ * scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
+ * scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
+ * scala> sql("select * from t1").show
+ * +---+---+
+ * | i| j|
+ * +---+---+
+ * | 5| 6|
+ * | 3| 4|
+ * | 1| 2|
+ * +---+---+
+ * }}}
+ *
* Because it inserts data to an existing table, format or options will be ignored.
*
* @since 1.4.0
@@ -454,8 +471,23 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* save mode, specified by the `mode` function (default to throwing an exception).
* When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not need to be
* the same as that of the existing table.
- * When `mode` is `Append`, the schema of the [[DataFrame]] need to be
- * the same as that of the existing table, and format or options will be ignored.
+ *
+ * When `mode` is `Append`, if there is an existing table, we will use the format and options of
+ * the existing table. The column order in the schema of the [[DataFrame]] doesn't need to be same
+ * as that of the existing table. Unlike `insertInto`, `saveAsTable` will use the column names to
+ * find the correct column positions. For example:
+ *
+ * {{{
+ * scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
+ * scala> Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("t1")
+ * scala> sql("select * from t1").show
+ * +---+---+
+ * | i| j|
+ * +---+---+
+ * | 1| 2|
+ * | 4| 3|
+ * +---+---+
+ * }}}
*
* When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input
* path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 1494341d58..3525111e46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -192,6 +192,11 @@ case class CreateDataSourceTableAsSelectCommand(
EliminateSubqueryAliases(
sessionState.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+ if (query.schema.size != l.schema.size) {
+ throw new AnalysisException(
+ s"The column number of the existing schema[${l.schema}] " +
+ s"doesn't match the data schema[${query.schema}]'s")
+ }
existingSchema = Some(l.schema)
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b2a80e70be..676fbd0a39 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1038,6 +1038,49 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
+ test("saveAsTable[append]: the column order doesn't matter") {
+ withTable("saveAsTable_column_order") {
+ Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_column_order")
+ Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("saveAsTable_column_order")
+ checkAnswer(
+ table("saveAsTable_column_order"),
+ Seq((1, 2), (4, 3)).toDF("i", "j"))
+ }
+ }
+
+ test("saveAsTable[append]: mismatch column names") {
+ withTable("saveAsTable_mismatch_column_names") {
+ Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_mismatch_column_names")
+ val e = intercept[AnalysisException] {
+ Seq((3, 4)).toDF("i", "k")
+ .write.mode("append").saveAsTable("saveAsTable_mismatch_column_names")
+ }
+ assert(e.getMessage.contains("cannot resolve"))
+ }
+ }
+
+ test("saveAsTable[append]: too many columns") {
+ withTable("saveAsTable_too_many_columns") {
+ Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_too_many_columns")
+ val e = intercept[AnalysisException] {
+ Seq((3, 4, 5)).toDF("i", "j", "k")
+ .write.mode("append").saveAsTable("saveAsTable_too_many_columns")
+ }
+ assert(e.getMessage.contains("doesn't match"))
+ }
+ }
+
+ test("saveAsTable[append]: less columns") {
+ withTable("saveAsTable_less_columns") {
+ Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns")
+ val e = intercept[AnalysisException] {
+ Seq((4)).toDF("j")
+ .write.mode("append").saveAsTable("saveAsTable_less_columns")
+ }
+ assert(e.getMessage.contains("doesn't match"))
+ }
+ }
+
test("SPARK-15025: create datasource table with path with select") {
withTempPath { dir =>
withTable("t") {