aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-06-20 20:17:47 +0800
committerCheng Lian <lian@databricks.com>2016-06-20 20:17:47 +0800
commit905f774b71f4b814d5a2412c7c35bd023c3dfdf8 (patch)
tree80e2f1fa4c18d0d5625fbc4ff6c9528bab6c4690 /sql/hive/src/test
parent6d0f921aedfdd3b7e8472b6776d0c7d8299190bd (diff)
downloadspark-905f774b71f4b814d5a2412c7c35bd023c3dfdf8.tar.gz
spark-905f774b71f4b814d5a2412c7c35bd023c3dfdf8.tar.bz2
spark-905f774b71f4b814d5a2412c7c35bd023c3dfdf8.zip
[SPARK-16030][SQL] Allow specifying static partitions when inserting to data source tables
## What changes were proposed in this pull request? This PR adds the static partition support to INSERT statement when the target table is a data source table. ## How was this patch tested? New tests in InsertIntoHiveTableSuite and DataSourceAnalysisSuite. **Note: This PR is based on https://github.com/apache/spark/pull/13766. The last commit is the actual change.** Author: Yin Huai <yhuai@databricks.com> Closes #13769 from yhuai/SPARK-16030-1.
Diffstat (limited to 'sql/hive/src/test')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala97
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala2
2 files changed, 88 insertions, 11 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index d4ebd051d2..46432512ba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.hive
import java.io.File
-import org.apache.hadoop.hive.conf.HiveConf
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, _}
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -331,7 +331,11 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
withTable(hiveTable) {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- sql(s"CREATE TABLE $hiveTable (a INT) PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE")
+ sql(
+ s"""
+ |CREATE TABLE $hiveTable (a INT, d INT)
+ |PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE
+ """.stripMargin)
f(hiveTable)
}
}
@@ -343,7 +347,11 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
val dsTable = "ds_table"
withTable(dsTable) {
- sql(s"CREATE TABLE $dsTable (a INT, b INT, c INT) USING PARQUET PARTITIONED BY (b, c)")
+ sql(
+ s"""
+ |CREATE TABLE $dsTable (a INT, b INT, c INT, d INT)
+ |USING PARQUET PARTITIONED BY (b, c)
+ """.stripMargin)
f(dsTable)
}
}
@@ -356,7 +364,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
val cause = intercept[AnalysisException] {
- Seq((1, 2, 3)).toDF("a", "b", "c").write.partitionBy("b", "c").insertInto(tableName)
+ Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName)
}
assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
@@ -382,14 +390,83 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
assert(e.message.contains("the number of columns are different"))
}
- testPartitionedTable(
- "SPARK-16037: INSERT statement should match columns by position") {
+ testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
tableName =>
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 2 AS c, 3 AS b")
- checkAnswer(sql(s"SELECT a, b, c FROM $tableName"), Row(1, 2, 3))
- sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 2, 3")
- checkAnswer(sql(s"SELECT a, b, c FROM $tableName"), Row(1, 2, 3))
+ sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+ checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
+ sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 4, 2, 3")
+ checkAnswer(sql(s"SELECT a, b, c, 4 FROM $tableName"), Row(1, 2, 3, 4))
+ }
+ }
+
+ testPartitionedTable("INSERT INTO a partitioned table (semantic and error handling)") {
+ tableName =>
+ withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=2, c=3) SELECT 1, 4")
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=6, c=7) SELECT 5, 8")
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12")
+
+ // c is defined twice. Parser will complain.
+ intercept[ParseException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) SELECT 13")
+ }
+
+ // d is not a partitioning column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13, 14")
+ }
+
+ // d is not a partitioning column. The total number of columns is correct.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13")
+ }
+
+ // The data is missing a column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
+ }
+
+ // d is not a partitioning column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=15, d=15) SELECT 13, 14")
+ }
+
+ // The statement is missing a column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14")
+ }
+
+ // The statement is missing a column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14, 16")
+ }
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c) SELECT 13, 16, 15")
+
+ // Dynamic partitioning columns need to be after static partitioning columns.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b, c=19) SELECT 17, 20, 18")
+ }
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b, c) SELECT 17, 20, 18, 19")
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (c, b) SELECT 21, 24, 22, 23")
+
+ sql(s"INSERT INTO TABLE $tableName SELECT 25, 28, 26, 27")
+
+ checkAnswer(
+ sql(s"SELECT a, b, c, d FROM $tableName"),
+ Row(1, 2, 3, 4) ::
+ Row(5, 6, 7, 8) ::
+ Row(9, 10, 11, 12) ::
+ Row(13, 14, 15, 16) ::
+ Row(17, 18, 19, 20) ::
+ Row(21, 22, 23, 24) ::
+ Row(25, 26, 27, 28) :: Nil
+ )
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 85b159e2a5..f8c55ec456 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1006,7 +1006,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("SET hive.exec.dynamic.partition.mode=nonstrict")
// Should throw when a static partition appears after a dynamic partition
- intercept[SparkException] {
+ intercept[AnalysisException] {
sql(
"""INSERT INTO TABLE dp_test PARTITION(dp, sp = 1)
|SELECT key, value, key % 5 FROM src