diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-06-18 10:32:27 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-06-18 10:32:27 -0700 |
commit | 3d010c837582c23b5ddf65602213e3772b418f08 (patch) | |
tree | dde1266b582292269c8f3d2380c797687d2a799d /sql/hive/src/test/scala | |
parent | e574c9973ddbef023c066ccd6f771ab01cbf2d88 (diff) | |
download | spark-3d010c837582c23b5ddf65602213e3772b418f08.tar.gz spark-3d010c837582c23b5ddf65602213e3772b418f08.tar.bz2 spark-3d010c837582c23b5ddf65602213e3772b418f08.zip |
[SPARK-16036][SPARK-16037][SQL] fix various table insertion problems
## What changes were proposed in this pull request?
The current table insertion has some weird behaviours:
1. inserting into a partitioned table with mismatch columns has confusing error message for hive table, and wrong result for datasource table
2. inserting into a partitioned table without partition list has wrong result for hive table.
This PR fixes these 2 problems.
## How was this patch tested?
new test in hive `SQLQuerySuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes #13754 from cloud-fan/insert2.
Diffstat (limited to 'sql/hive/src/test/scala')
4 files changed, 33 insertions, 56 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index b890b4bffd..c48735142d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -325,27 +325,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } - test("Detect table partitioning with correct partition order") { - withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { - sql("CREATE TABLE source (id bigint, part2 string, part1 string, data string)") - val data = (1 to 10).map(i => (i, if ((i % 2) == 0) "even" else "odd", "p", s"data-$i")) - .toDF("id", "part2", "part1", "data") - - data.write.insertInto("source") - checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) - - // the original data with part1 and part2 at the end - val expected = data.select("id", "data", "part1", "part2") - - sql( - """CREATE TABLE partitioned (id bigint, data string) - |PARTITIONED BY (part1 string, part2 string)""".stripMargin) - spark.table("source").write.insertInto("partitioned") - - checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq) - } - } - private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = { test(s"Hive SerDe table - $testName") { val hiveTable = "hive_table" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index a846711b84..f5d2f02d51 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -348,6 +348,7 @@ abstract class HiveComparisonTest queryString.replace("../../data", testDataPath)) val containsCommands = originalQuery.analyzed.collectFirst { case _: Command => () + case _: InsertIntoTable => () case _: LogicalInsertIntoHiveTable => () }.nonEmpty diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e0f6ccf04d..a16b5b2e23 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1033,41 +1033,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("SELECT * FROM boom").queryExecution.analyzed } - test("SPARK-3810: PreInsertionCasts static partitioning support") { - val analyzedPlan = { - loadTestTable("srcpart") - sql("DROP TABLE IF EXISTS withparts") - sql("CREATE TABLE withparts LIKE srcpart") - sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src") - .queryExecution.analyzed - } - - assertResult(1, "Duplicated project detected\n" + analyzedPlan) { - analyzedPlan.collect { - case _: Project => () - }.size - } - } - - test("SPARK-3810: PreInsertionCasts dynamic partitioning support") { - val analyzedPlan = { - loadTestTable("srcpart") - sql("DROP TABLE IF EXISTS withparts") - sql("CREATE TABLE withparts LIKE srcpart") - sql("SET hive.exec.dynamic.partition.mode=nonstrict") - - sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart") - sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value FROM src") - .queryExecution.analyzed - } - - assertResult(1, "Duplicated project detected\n" + analyzedPlan) { - analyzedPlan.collect { - case _: Project => () - }.size - } - } - test("parse HQL set commands") { // Adapted from its SQL counterpart. val testKey = "spark.sql.key.usedfortestonly" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 9c1f218253..46a77dd917 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1684,4 +1684,36 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ) } } + + test("SPARK-16036: better error message when insert into a table with mismatch schema") { + withTable("hive_table", "datasource_table") { + sql("CREATE TABLE hive_table(a INT) PARTITIONED BY (b INT, c INT)") + sql("CREATE TABLE datasource_table(a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)") + val e1 = intercept[AnalysisException] { + sql("INSERT INTO TABLE hive_table PARTITION(b=1, c=2) SELECT 1, 2, 3") + } + assert(e1.message.contains("the number of columns are different")) + val e2 = intercept[AnalysisException] { + sql("INSERT INTO TABLE datasource_table PARTITION(b=1, c=2) SELECT 1, 2, 3") + } + assert(e2.message.contains("the number of columns are different")) + } + } + + test("SPARK-16037: INSERT statement should match columns by position") { + withTable("hive_table", "datasource_table") { + sql("CREATE TABLE hive_table(a INT) PARTITIONED BY (b INT, c INT)") + sql("CREATE TABLE datasource_table(a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)") + + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql("INSERT INTO TABLE hive_table SELECT 1, 2 AS c, 3 AS b") + checkAnswer(sql("SELECT a, b, c FROM hive_table"), Row(1, 2, 3)) + sql("INSERT OVERWRITE TABLE hive_table SELECT 1, 2, 3") + checkAnswer(sql("SELECT a, b, c FROM hive_table"), Row(1, 2, 3)) + } + + sql("INSERT INTO TABLE datasource_table SELECT 1, 2 AS c, 3 AS b") + checkAnswer(sql("SELECT a, b, c FROM datasource_table"), Row(1, 2, 3)) + } + } } |