diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-05-10 23:53:55 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-05-10 23:53:55 -0700 |
commit | 875ef764280428acd095aec1834fee0ddad08611 (patch) | |
tree | 82ccd9556e78d06e826403aacb366e5ba00f6c14 | |
parent | 007882c7ee06de37ba309424fced1e4c6b408572 (diff) | |
download | spark-875ef764280428acd095aec1834fee0ddad08611.tar.gz spark-875ef764280428acd095aec1834fee0ddad08611.tar.bz2 spark-875ef764280428acd095aec1834fee0ddad08611.zip |
[SPARK-15231][SQL] Document the semantic of saveAsTable and insertInto and don't drop columns silently
## What changes were proposed in this pull request?
This PR adds documents about the different behaviors between `insertInto` and `saveAsTable`, and throws an exception when the user try to add too man columns using `saveAsTable with append`.
## How was this patch tested?
Unit tests added in this PR.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #13013 from zsxwing/SPARK-15231.
3 files changed, 82 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index da9d25443e..a9e8329c4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -361,6 +361,23 @@ final class DataFrameWriter private[sql](df: DataFrame) { * Inserts the content of the [[DataFrame]] to the specified table. It requires that * the schema of the [[DataFrame]] is the same as the schema of the table. * + * Note: Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based + * resolution. For example: + * + * {{{ + * scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") + * scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1") + * scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1") + * scala> sql("select * from t1").show + * +---+---+ + * | i| j| + * +---+---+ + * | 5| 6| + * | 3| 4| + * | 1| 2| + * +---+---+ + * }}} + * * Because it inserts data to an existing table, format or options will be ignored. * * @since 1.4.0 @@ -454,8 +471,23 @@ final class DataFrameWriter private[sql](df: DataFrame) { * save mode, specified by the `mode` function (default to throwing an exception). * When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not need to be * the same as that of the existing table. - * When `mode` is `Append`, the schema of the [[DataFrame]] need to be - * the same as that of the existing table, and format or options will be ignored. + * + * When `mode` is `Append`, if there is an existing table, we will use the format and options of + * the existing table. The column order in the schema of the [[DataFrame]] doesn't need to be same + * as that of the existing table. Unlike `insertInto`, `saveAsTable` will use the column names to + * find the correct column positions. For example: + * + * {{{ + * scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") + * scala> Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("t1") + * scala> sql("select * from t1").show + * +---+---+ + * | i| j| + * +---+---+ + * | 1| 2| + * | 4| 3| + * +---+---+ + * }}} * * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 1494341d58..3525111e46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -192,6 +192,11 @@ case class CreateDataSourceTableAsSelectCommand( EliminateSubqueryAliases( sessionState.catalog.lookupRelation(tableIdent)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => + if (query.schema.size != l.schema.size) { + throw new AnalysisException( + s"The column number of the existing schema[${l.schema}] " + + s"doesn't match the data schema[${query.schema}]'s") + } existingSchema = Some(l.schema) case o => throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index b2a80e70be..676fbd0a39 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1038,6 +1038,49 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } + test("saveAsTable[append]: the column order doesn't matter") { + withTable("saveAsTable_column_order") { + Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_column_order") + Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("saveAsTable_column_order") + checkAnswer( + table("saveAsTable_column_order"), + Seq((1, 2), (4, 3)).toDF("i", "j")) + } + } + + test("saveAsTable[append]: mismatch column names") { + withTable("saveAsTable_mismatch_column_names") { + Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_mismatch_column_names") + val e = intercept[AnalysisException] { + Seq((3, 4)).toDF("i", "k") + .write.mode("append").saveAsTable("saveAsTable_mismatch_column_names") + } + assert(e.getMessage.contains("cannot resolve")) + } + } + + test("saveAsTable[append]: too many columns") { + withTable("saveAsTable_too_many_columns") { + Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_too_many_columns") + val e = intercept[AnalysisException] { + Seq((3, 4, 5)).toDF("i", "j", "k") + .write.mode("append").saveAsTable("saveAsTable_too_many_columns") + } + assert(e.getMessage.contains("doesn't match")) + } + } + + test("saveAsTable[append]: less columns") { + withTable("saveAsTable_less_columns") { + Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns") + val e = intercept[AnalysisException] { + Seq((4)).toDF("j") + .write.mode("append").saveAsTable("saveAsTable_less_columns") + } + assert(e.getMessage.contains("doesn't match")) + } + } + test("SPARK-15025: create datasource table with path with select") { withTempPath { dir => withTable("t") { |