diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-06-16 22:54:02 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-06-16 22:54:02 -0700 |
commit | e5d703bca85c65ce329b1e202283cfa35d109146 (patch) | |
tree | e16efee0c26449d5f03fb4dd34098aa2959d4e47 | |
parent | 5ada606144c7bf38a797764619d7d1ff677802b3 (diff) | |
download | spark-e5d703bca85c65ce329b1e202283cfa35d109146.tar.gz spark-e5d703bca85c65ce329b1e202283cfa35d109146.tar.bz2 spark-e5d703bca85c65ce329b1e202283cfa35d109146.zip |
[SPARK-15706][SQL] Fix Wrong Answer when using IF NOT EXISTS in INSERT OVERWRITE for DYNAMIC PARTITION
#### What changes were proposed in this pull request?
`IF NOT EXISTS` in `INSERT OVERWRITE` should not support dynamic partitions. If we specify `IF NOT EXISTS`, the inserted statement is not shown in the table.
This PR is to issue an exception in this case, just like what Hive does. Also issue an exception if users specify `IF NOT EXISTS` if users do not specify any `PARTITION` specification.
#### How was this patch tested?
Added test cases into `PlanParserSuite` and `InsertIntoHiveTableSuite`
Author: gatorsmile <gatorsmile@gmail.com>
Closes #13447 from gatorsmile/insertIfNotExist.
5 files changed, 85 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index b60319668c..23e925e4de 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -203,7 +203,7 @@ query ; insertInto - : INSERT OVERWRITE TABLE tableIdentifier partitionSpec? (IF NOT EXISTS)? + : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? | INSERT INTO TABLE? tableIdentifier partitionSpec? ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index e380643f54..c7420a1c59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -171,6 +171,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { val tableIdent = visitTableIdentifier(ctx.tableIdentifier) val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) + val dynamicPartitionKeys = partitionKeys.filter(_._2.isEmpty) + if (ctx.EXISTS != null && dynamicPartitionKeys.nonEmpty) { + throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " + + "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) + } + InsertIntoTable( UnresolvedRelation(tableIdent, None), partitionKeys, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 898784dab1..6c3eb3a5a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -377,6 +377,7 @@ case class InsertIntoTable( } assert(overwrite || !ifNotExists) + assert(partition.values.forall(_.nonEmpty) || !ifNotExists) override lazy val resolved: Boolean = childrenResolved && table.resolved && expectedColumns.forall { expected => child.output.size == expected.size && child.output.zip(expected).forall { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 77023cfd3d..456948d645 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -183,14 +183,12 @@ class PlanParserSuite extends PlanTest { // Single inserts assertEqual(s"insert overwrite table s $sql", insert(Map.empty, overwrite = true)) - assertEqual(s"insert overwrite table s if not exists $sql", - insert(Map.empty, overwrite = true, ifNotExists = true)) + assertEqual(s"insert overwrite table s partition (e = 1) if not exists $sql", + insert(Map("e" -> Option("1")), overwrite = true, ifNotExists = true)) assertEqual(s"insert into s $sql", insert(Map.empty)) assertEqual(s"insert into table s partition (c = 'd', e = 1) $sql", insert(Map("c" -> Option("d"), "e" -> Option("1")))) - assertEqual(s"insert overwrite table s partition (c = 'd', x) if not exists $sql", - insert(Map("c" -> Option("d"), "x" -> None), overwrite = true, ifNotExists = true)) // Multi insert val plan2 = table("t").where('x > 5).select(star()) @@ -201,6 +199,13 @@ class PlanParserSuite extends PlanTest { table("u"), Map.empty, plan2, overwrite = false, ifNotExists = false))) } + test ("insert with if not exists") { + val sql = "select * from t" + intercept(s"insert overwrite table s partition (e = 1, x) if not exists $sql", + "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [x]") + intercept[ParseException](parsePlan(s"insert overwrite table s if not exists $sql")) + } + test("aggregation") { val sql = "select a, b, sum(c) as c from d group by a, b" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index fae59001b9..3bf45ced75 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -166,6 +166,74 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE tmp_table") } + test("INSERT OVERWRITE - partition IF NOT EXISTS") { + withTempDir { tmpDir => + val table = "table_with_partition" + withTable(table) { + val selQuery = s"select c1, p1, p2 from $table" + sql( + s""" + |CREATE TABLE $table(c1 string) + |PARTITIONED by (p1 string,p2 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + sql( + s""" + |INSERT OVERWRITE TABLE $table + |partition (p1='a',p2='b') + |SELECT 'blarr' + """.stripMargin) + checkAnswer( + sql(selQuery), + Row("blarr", "a", "b")) + + sql( + s""" + |INSERT OVERWRITE TABLE $table + |partition (p1='a',p2='b') + |SELECT 'blarr2' + """.stripMargin) + checkAnswer( + sql(selQuery), + Row("blarr2", "a", "b")) + + var e = intercept[AnalysisException] { + sql( + s""" + |INSERT OVERWRITE TABLE $table + |partition (p1='a',p2) IF NOT EXISTS + |SELECT 'blarr3', 'newPartition' + """.stripMargin) + } + assert(e.getMessage.contains( + "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]")) + + e = intercept[AnalysisException] { + sql( + s""" + |INSERT OVERWRITE TABLE $table + |partition (p1='a',p2) IF NOT EXISTS + |SELECT 'blarr3', 'b' + """.stripMargin) + } + assert(e.getMessage.contains( + "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]")) + + // If the partition already exists, the insert will overwrite the data + // unless users specify IF NOT EXISTS + sql( + s""" + |INSERT OVERWRITE TABLE $table + |partition (p1='a',p2='b') IF NOT EXISTS + |SELECT 'blarr3' + """.stripMargin) + checkAnswer( + sql(selQuery), + Row("blarr2", "a", "b")) + } + } + } + test("Insert ArrayType.containsNull == false") { val schema = StructType(Seq( StructField("a", ArrayType(StringType, containsNull = false)))) |