aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-05-09 17:01:23 +0800
committerCheng Lian <lian@databricks.com>2016-05-09 17:01:23 +0800
commit652bbb1bf62722b08a062c7a2bf72019f85e179e (patch)
tree5e4d588f2c9070a0c645c176b7bd42dd34fe04b5 /sql/hive/src/test
parent16a503cf0af3e7c703d56a1a730e4f3a534f6b3c (diff)
downloadspark-652bbb1bf62722b08a062c7a2bf72019f85e179e.tar.gz
spark-652bbb1bf62722b08a062c7a2bf72019f85e179e.tar.bz2
spark-652bbb1bf62722b08a062c7a2bf72019f85e179e.zip
[SPARK-14459][SQL] Detect relation partitioning and adjust the logical plan
## What changes were proposed in this pull request? This detects a relation's partitioning and adds checks to the analyzer. If an InsertIntoTable node has no partitioning, it is replaced by the relation's partition scheme and input columns are correctly adjusted, placing the partition columns at the end in partition order. If an InsertIntoTable node has partitioning, it is checked against the table's reported partitions. These changes required adding a PartitionedRelation trait to the catalog interface because Hive's MetastoreRelation doesn't extend CatalogRelation. This commit also includes a fix to InsertIntoTable's resolved logic, which now detects that all expected columns are present, including dynamic partition columns. Previously, the number of expected columns was not checked and resolved was true if there were missing columns. ## How was this patch tested? This adds new tests to the InsertIntoTableSuite that are fixed by this PR. Author: Ryan Blue <blue@apache.org> Closes #12239 from rdblue/SPARK-14459-detect-hive-partitioning.
Diffstat (limited to 'sql/hive/src/test')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala83
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala2
2 files changed, 80 insertions, 5 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index baf34d1cf0..52aba328de 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -22,9 +22,11 @@ import java.io.File
import org.apache.hadoop.hive.conf.HiveConf
import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, _}
-import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -32,11 +34,11 @@ case class TestData(key: Int, value: String)
case class ThreeCloumntable(key: Int, value: String, key1: String)
-class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter {
+class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
+ with SQLTestUtils {
import hiveContext.implicits._
- import hiveContext.sql
- val testData = hiveContext.sparkContext.parallelize(
+ override lazy val testData = hiveContext.sparkContext.parallelize(
(1 to 100).map(i => TestData(i, i.toString))).toDF()
before {
@@ -213,4 +215,77 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
sql("DROP TABLE hiveTableWithStructValue")
}
+
+ test("Reject partitioning that does not match table") {
+ withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+ sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+ val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
+ .toDF("id", "data", "part")
+
+ intercept[AnalysisException] {
+ // cannot partition by 2 fields when there is only one in the table definition
+ data.write.partitionBy("part", "data").insertInto("partitioned")
+ }
+ }
+ }
+
+ test("Test partition mode = strict") {
+ withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
+ sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+ val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
+ .toDF("id", "data", "part")
+
+ intercept[SparkException] {
+ data.write.insertInto("partitioned")
+ }
+ }
+ }
+
+ test("Detect table partitioning") {
+ withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+ sql("CREATE TABLE source (id bigint, data string, part string)")
+ val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF()
+
+ data.write.insertInto("source")
+ checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
+
+ sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+ // this will pick up the output partitioning from the table definition
+ sqlContext.table("source").write.insertInto("partitioned")
+
+ checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq)
+ }
+ }
+
+ test("Detect table partitioning with correct partition order") {
+ withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+ sql("CREATE TABLE source (id bigint, part2 string, part1 string, data string)")
+ val data = (1 to 10).map(i => (i, if ((i % 2) == 0) "even" else "odd", "p", s"data-$i"))
+ .toDF("id", "part2", "part1", "data")
+
+ data.write.insertInto("source")
+ checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
+
+ // the original data with part1 and part2 at the end
+ val expected = data.select("id", "data", "part1", "part2")
+
+ sql(
+ """CREATE TABLE partitioned (id bigint, data string)
+ |PARTITIONED BY (part1 string, part2 string)""".stripMargin)
+ sqlContext.table("source").write.insertInto("partitioned")
+
+ checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq)
+ }
+ }
+
+ test("InsertIntoTable#resolved should include dynamic partitions") {
+ withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+ sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+ val data = (1 to 10).map(i => (i.toLong, s"data-$i")).toDF("id", "data")
+
+ val logical = InsertIntoTable(sqlContext.table("partitioned").logicalPlan,
+ Map("part" -> None), data.logicalPlan, overwrite = false, ifNotExists = false)
+ assert(!logical.resolved, "Should not resolve: missing partition data")
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 3bf0e84267..bbb775ef77 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -978,7 +978,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("SET hive.exec.dynamic.partition.mode=strict")
// Should throw when using strict dynamic partition mode without any static partition
- intercept[SparkException] {
+ intercept[AnalysisException] {
sql(
"""INSERT INTO TABLE dp_test PARTITION(dp)
|SELECT key, value, key % 5 FROM src