diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-12-14 11:30:34 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-12-14 11:30:34 +0800 |
commit | 3e307b4959ecdab3f9c16484d172403357e7d09b (patch) | |
tree | 231652eec3b96d4a0472eec0b6e1c2f963729bd7 /sql/catalyst/src | |
parent | f2ddabfa09fda26ff0391d026dd67545dab33e01 (diff) | |
download | spark-3e307b4959ecdab3f9c16484d172403357e7d09b.tar.gz spark-3e307b4959ecdab3f9c16484d172403357e7d09b.tar.bz2 spark-3e307b4959ecdab3f9c16484d172403357e7d09b.zip |
[SPARK-18566][SQL] remove OverwriteOptions
## What changes were proposed in this pull request?
`OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes #15995 from cloud-fan/overwrite.
Diffstat (limited to 'sql/catalyst/src')
4 files changed, 7 insertions, 37 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index e901683be6..66e52ca68a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -367,7 +367,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan = InsertIntoTable( analysis.UnresolvedRelation(TableIdentifier(tableName)), - Map.empty, logicalPlan, OverwriteOptions(overwrite), false) + Map.empty, logicalPlan, overwrite, false) def as(alias: String): LogicalPlan = logicalPlan match { case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7b8badcf8c..3969fdb0ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -177,15 +177,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " + "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } - val overwrite = ctx.OVERWRITE != null - val staticPartitionKeys: Map[String, String] = - partitionKeys.filter(_._2.nonEmpty).map(t => (t._1, t._2.get)) InsertIntoTable( UnresolvedRelation(tableIdent, None), partitionKeys, query, - OverwriteOptions(overwrite, if (overwrite) staticPartitionKeys else Map.empty), + ctx.OVERWRITE != null, ctx.EXISTS != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c210b74e8a..b9bdd53dd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog.CatalogTypes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -347,22 +345,6 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { } /** - * Options for writing new data into a table. - * - * @param enabled whether to overwrite existing data in the table. - * @param staticPartitionKeys if non-empty, specifies that we only want to overwrite partitions - * that match this partial partition spec. If empty, all partitions - * will be overwritten. - */ -case class OverwriteOptions( - enabled: Boolean, - staticPartitionKeys: CatalogTypes.TablePartitionSpec = Map.empty) { - if (staticPartitionKeys.nonEmpty) { - assert(enabled, "Overwrite must be enabled when specifying specific partitions.") - } -} - -/** * Insert some data into a table. * * @param table the logical plan representing the table. In the future this should be a @@ -382,14 +364,14 @@ case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], child: LogicalPlan, - overwrite: OverwriteOptions, + overwrite: Boolean, ifNotExists: Boolean) extends LogicalPlan { override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = Seq.empty - assert(overwrite.enabled || !ifNotExists) + assert(overwrite || !ifNotExists) assert(partition.values.forall(_.nonEmpty) || !ifNotExists) override lazy val resolved: Boolean = childrenResolved && table.resolved diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 8e4327c788..f408ba99d1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -180,16 +180,7 @@ class PlanParserSuite extends PlanTest { partition: Map[String, Option[String]], overwrite: Boolean = false, ifNotExists: Boolean = false): LogicalPlan = - InsertIntoTable( - table("s"), partition, plan, - OverwriteOptions( - overwrite, - if (overwrite && partition.nonEmpty) { - partition.map(kv => (kv._1, kv._2.get)) - } else { - Map.empty - }), - ifNotExists) + InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists) // Single inserts assertEqual(s"insert overwrite table s $sql", @@ -205,9 +196,9 @@ class PlanParserSuite extends PlanTest { val plan2 = table("t").where('x > 5).select(star()) assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5", InsertIntoTable( - table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union( + table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union( InsertIntoTable( - table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false))) + table("u"), Map.empty, plan2, false, ifNotExists = false))) } test ("insert with if not exists") { |