aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g42
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala1
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala68
5 files changed, 85 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index b60319668c..23e925e4de 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -203,7 +203,7 @@ query
;
insertInto
- : INSERT OVERWRITE TABLE tableIdentifier partitionSpec? (IF NOT EXISTS)?
+ : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)?
| INSERT INTO TABLE? tableIdentifier partitionSpec?
;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index e380643f54..c7420a1c59 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -171,6 +171,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
+ val dynamicPartitionKeys = partitionKeys.filter(_._2.isEmpty)
+ if (ctx.EXISTS != null && dynamicPartitionKeys.nonEmpty) {
+ throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
+ "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
+ }
+
InsertIntoTable(
UnresolvedRelation(tableIdent, None),
partitionKeys,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 898784dab1..6c3eb3a5a2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -377,6 +377,7 @@ case class InsertIntoTable(
}
assert(overwrite || !ifNotExists)
+ assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
override lazy val resolved: Boolean =
childrenResolved && table.resolved && expectedColumns.forall { expected =>
child.output.size == expected.size && child.output.zip(expected).forall {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 77023cfd3d..456948d645 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -183,14 +183,12 @@ class PlanParserSuite extends PlanTest {
// Single inserts
assertEqual(s"insert overwrite table s $sql",
insert(Map.empty, overwrite = true))
- assertEqual(s"insert overwrite table s if not exists $sql",
- insert(Map.empty, overwrite = true, ifNotExists = true))
+ assertEqual(s"insert overwrite table s partition (e = 1) if not exists $sql",
+ insert(Map("e" -> Option("1")), overwrite = true, ifNotExists = true))
assertEqual(s"insert into s $sql",
insert(Map.empty))
assertEqual(s"insert into table s partition (c = 'd', e = 1) $sql",
insert(Map("c" -> Option("d"), "e" -> Option("1"))))
- assertEqual(s"insert overwrite table s partition (c = 'd', x) if not exists $sql",
- insert(Map("c" -> Option("d"), "x" -> None), overwrite = true, ifNotExists = true))
// Multi insert
val plan2 = table("t").where('x > 5).select(star())
@@ -201,6 +199,13 @@ class PlanParserSuite extends PlanTest {
table("u"), Map.empty, plan2, overwrite = false, ifNotExists = false)))
}
+ test ("insert with if not exists") {
+ val sql = "select * from t"
+ intercept(s"insert overwrite table s partition (e = 1, x) if not exists $sql",
+ "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [x]")
+ intercept[ParseException](parsePlan(s"insert overwrite table s if not exists $sql"))
+ }
+
test("aggregation") {
val sql = "select a, b, sum(c) as c from d group by a, b"
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index fae59001b9..3bf45ced75 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -166,6 +166,74 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
sql("DROP TABLE tmp_table")
}
+ test("INSERT OVERWRITE - partition IF NOT EXISTS") {
+ withTempDir { tmpDir =>
+ val table = "table_with_partition"
+ withTable(table) {
+ val selQuery = s"select c1, p1, p2 from $table"
+ sql(
+ s"""
+ |CREATE TABLE $table(c1 string)
+ |PARTITIONED by (p1 string,p2 string)
+ |location '${tmpDir.toURI.toString}'
+ """.stripMargin)
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $table
+ |partition (p1='a',p2='b')
+ |SELECT 'blarr'
+ """.stripMargin)
+ checkAnswer(
+ sql(selQuery),
+ Row("blarr", "a", "b"))
+
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $table
+ |partition (p1='a',p2='b')
+ |SELECT 'blarr2'
+ """.stripMargin)
+ checkAnswer(
+ sql(selQuery),
+ Row("blarr2", "a", "b"))
+
+ var e = intercept[AnalysisException] {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $table
+ |partition (p1='a',p2) IF NOT EXISTS
+ |SELECT 'blarr3', 'newPartition'
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains(
+ "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]"))
+
+ e = intercept[AnalysisException] {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $table
+ |partition (p1='a',p2) IF NOT EXISTS
+ |SELECT 'blarr3', 'b'
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains(
+ "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]"))
+
+ // If the partition already exists, the insert will overwrite the data
+ // unless users specify IF NOT EXISTS
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $table
+ |partition (p1='a',p2='b') IF NOT EXISTS
+ |SELECT 'blarr3'
+ """.stripMargin)
+ checkAnswer(
+ sql(selQuery),
+ Row("blarr2", "a", "b"))
+ }
+ }
+ }
+
test("Insert ArrayType.containsNull == false") {
val schema = StructType(Seq(
StructField("a", ArrayType(StringType, containsNull = false))))