aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-11-02 14:15:10 +0800
committerWenchen Fan <wenchen@databricks.com>2016-11-02 14:15:10 +0800
commitabefe2ec428dc24a4112c623fb6fbe4b2ca60a2b (patch)
treec37c599640ae00418499d9467a025a47daebe43d /sql/catalyst
parent620da3b4828b3580c7ed7339b2a07938e6be1bb1 (diff)
downloadspark-abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b.tar.gz
spark-abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b.tar.bz2
spark-abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b.zip
[SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables
## What changes were proposed in this pull request? There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive. (1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition. (2) INSERT|OVERWRITE does not work with partitions that have custom locations. This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged. There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release. ## How was this patch tested? Unit tests. Author: Eric Liang <ekl@databricks.com> Closes #15705 from ericl/sc-4942.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala19
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala15
4 files changed, 38 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 66e52ca68a..e901683be6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
analysis.UnresolvedRelation(TableIdentifier(tableName)),
- Map.empty, logicalPlan, overwrite, false)
+ Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 38e9bb6c16..ac1577b3ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
}
+ val overwrite = ctx.OVERWRITE != null
+ val overwritePartition =
+ if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) {
+ Some(partitionKeys.map(t => (t._1, t._2.get)))
+ } else {
+ None
+ }
InsertIntoTable(
UnresolvedRelation(tableIdent, None),
partitionKeys,
query,
- ctx.OVERWRITE != null,
+ OverwriteOptions(overwrite, overwritePartition),
ctx.EXISTS != null)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index a48974c632..7a15c2285d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
@@ -345,18 +346,32 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
}
+/**
+ * Options for writing new data into a table.
+ *
+ * @param enabled whether to overwrite existing data in the table.
+ * @param specificPartition only data in the specified partition will be overwritten.
+ */
+case class OverwriteOptions(
+ enabled: Boolean,
+ specificPartition: Option[CatalogTypes.TablePartitionSpec] = None) {
+ if (specificPartition.isDefined) {
+ assert(enabled, "Overwrite must be enabled when specifying a partition to overwrite.")
+ }
+}
+
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
- overwrite: Boolean,
+ overwrite: OverwriteOptions,
ifNotExists: Boolean)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty
- assert(overwrite || !ifNotExists)
+ assert(overwrite.enabled || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
override lazy val resolved: Boolean = childrenResolved && table.resolved
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index ca86304d4d..7400f3430e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -180,7 +180,16 @@ class PlanParserSuite extends PlanTest {
partition: Map[String, Option[String]],
overwrite: Boolean = false,
ifNotExists: Boolean = false): LogicalPlan =
- InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
+ InsertIntoTable(
+ table("s"), partition, plan,
+ OverwriteOptions(
+ overwrite,
+ if (overwrite && partition.nonEmpty) {
+ Some(partition.map(kv => (kv._1, kv._2.get)))
+ } else {
+ None
+ }),
+ ifNotExists)
// Single inserts
assertEqual(s"insert overwrite table s $sql",
@@ -196,9 +205,9 @@ class PlanParserSuite extends PlanTest {
val plan2 = table("t").where('x > 5).select(star())
assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
InsertIntoTable(
- table("s"), Map.empty, plan.limit(1), overwrite = false, ifNotExists = false).union(
+ table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union(
InsertIntoTable(
- table("u"), Map.empty, plan2, overwrite = false, ifNotExists = false)))
+ table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false)))
}
test ("insert with if not exists") {