diff options
author | Yin Huai <yhuai@databricks.com> | 2016-06-20 20:17:47 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-06-20 20:17:47 +0800 |
commit | 905f774b71f4b814d5a2412c7c35bd023c3dfdf8 (patch) | |
tree | 80e2f1fa4c18d0d5625fbc4ff6c9528bab6c4690 /sql/hive/src/test | |
parent | 6d0f921aedfdd3b7e8472b6776d0c7d8299190bd (diff) | |
download | spark-905f774b71f4b814d5a2412c7c35bd023c3dfdf8.tar.gz spark-905f774b71f4b814d5a2412c7c35bd023c3dfdf8.tar.bz2 spark-905f774b71f4b814d5a2412c7c35bd023c3dfdf8.zip |
[SPARK-16030][SQL] Allow specifying static partitions when inserting to data source tables
## What changes were proposed in this pull request?
This PR adds the static partition support to INSERT statement when the target table is a data source table.
## How was this patch tested?
New tests in InsertIntoHiveTableSuite and DataSourceAnalysisSuite.
**Note: This PR is based on https://github.com/apache/spark/pull/13766. The last commit is the actual change.**
Author: Yin Huai <yhuai@databricks.com>
Closes #13769 from yhuai/SPARK-16030-1.
Diffstat (limited to 'sql/hive/src/test')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 97 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 |
2 files changed, 88 insertions, 11 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index d4ebd051d2..46432512ba 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.hadoop.hive.conf.HiveConf import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException import org.apache.spark.sql.{QueryTest, _} +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -331,7 +331,11 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef withTable(hiveTable) { withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - sql(s"CREATE TABLE $hiveTable (a INT) PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE") + sql( + s""" + |CREATE TABLE $hiveTable (a INT, d INT) + |PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE + """.stripMargin) f(hiveTable) } } @@ -343,7 +347,11 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef val dsTable = "ds_table" withTable(dsTable) { - sql(s"CREATE TABLE $dsTable (a INT, b INT, c INT) USING PARQUET PARTITIONED BY (b, c)") + sql( + s""" + |CREATE TABLE $dsTable (a INT, b INT, c INT, d INT) + |USING PARQUET PARTITIONED BY (b, c) + """.stripMargin) f(dsTable) } } @@ -356,7 +364,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName => val cause = intercept[AnalysisException] { - Seq((1, 2, 3)).toDF("a", "b", "c").write.partitionBy("b", "c").insertInto(tableName) + Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName) } assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy().")) @@ -382,14 +390,83 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef assert(e.message.contains("the number of columns are different")) } - testPartitionedTable( - "SPARK-16037: INSERT statement should match columns by position") { + testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") { tableName => withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 2 AS c, 3 AS b") - checkAnswer(sql(s"SELECT a, b, c FROM $tableName"), Row(1, 2, 3)) - sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 2, 3") - checkAnswer(sql(s"SELECT a, b, c FROM $tableName"), Row(1, 2, 3)) + sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b") + checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4)) + sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 4, 2, 3") + checkAnswer(sql(s"SELECT a, b, c, 4 FROM $tableName"), Row(1, 2, 3, 4)) + } + } + + testPartitionedTable("INSERT INTO a partitioned table (semantic and error handling)") { + tableName => + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + sql(s"INSERT INTO TABLE $tableName PARTITION (b=2, c=3) SELECT 1, 4") + + sql(s"INSERT INTO TABLE $tableName PARTITION (b=6, c=7) SELECT 5, 8") + + sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12") + + // c is defined twice. Parser will complain. + intercept[ParseException] { + sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) SELECT 13") + } + + // d is not a partitioning column. + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13, 14") + } + + // d is not a partitioning column. The total number of columns is correct. + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13") + } + + // The data is missing a column. + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13") + } + + // d is not a partitioning column. + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName PARTITION (b=15, d=15) SELECT 13, 14") + } + + // The statement is missing a column. + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14") + } + + // The statement is missing a column. + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14, 16") + } + + sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c) SELECT 13, 16, 15") + + // Dynamic partitioning columns need to be after static partitioning columns. + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName PARTITION (b, c=19) SELECT 17, 20, 18") + } + + sql(s"INSERT INTO TABLE $tableName PARTITION (b, c) SELECT 17, 20, 18, 19") + + sql(s"INSERT INTO TABLE $tableName PARTITION (c, b) SELECT 21, 24, 22, 23") + + sql(s"INSERT INTO TABLE $tableName SELECT 25, 28, 26, 27") + + checkAnswer( + sql(s"SELECT a, b, c, d FROM $tableName"), + Row(1, 2, 3, 4) :: + Row(5, 6, 7, 8) :: + Row(9, 10, 11, 12) :: + Row(13, 14, 15, 16) :: + Row(17, 18, 19, 20) :: + Row(21, 22, 23, 24) :: + Row(25, 26, 27, 28) :: Nil + ) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 85b159e2a5..f8c55ec456 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1006,7 +1006,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("SET hive.exec.dynamic.partition.mode=nonstrict") // Should throw when a static partition appears after a dynamic partition - intercept[SparkException] { + intercept[AnalysisException] { sql( """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1) |SELECT key, value, key % 5 FROM src |