aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-12-14 11:30:34 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-14 11:30:34 +0800
commit3e307b4959ecdab3f9c16484d172403357e7d09b (patch)
tree231652eec3b96d4a0472eec0b6e1c2f963729bd7 /sql/catalyst/src
parentf2ddabfa09fda26ff0391d026dd67545dab33e01 (diff)
downloadspark-3e307b4959ecdab3f9c16484d172403357e7d09b.tar.gz
spark-3e307b4959ecdab3f9c16484d172403357e7d09b.tar.bz2
spark-3e307b4959ecdab3f9c16484d172403357e7d09b.zip
[SPARK-18566][SQL] remove OverwriteOptions
## What changes were proposed in this pull request? `OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #15995 from cloud-fan/overwrite.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala22
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala15
4 files changed, 7 insertions, 37 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index e901683be6..66e52ca68a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
analysis.UnresolvedRelation(TableIdentifier(tableName)),
- Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
+ Map.empty, logicalPlan, overwrite, false)
def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 7b8badcf8c..3969fdb0ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,15 +177,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
}
- val overwrite = ctx.OVERWRITE != null
- val staticPartitionKeys: Map[String, String] =
- partitionKeys.filter(_._2.nonEmpty).map(t => (t._1, t._2.get))
InsertIntoTable(
UnresolvedRelation(tableIdent, None),
partitionKeys,
query,
- OverwriteOptions(overwrite, if (overwrite) staticPartitionKeys else Map.empty),
+ ctx.OVERWRITE != null,
ctx.EXISTS != null)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index c210b74e8a..b9bdd53dd1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
@@ -347,22 +345,6 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
}
/**
- * Options for writing new data into a table.
- *
- * @param enabled whether to overwrite existing data in the table.
- * @param staticPartitionKeys if non-empty, specifies that we only want to overwrite partitions
- * that match this partial partition spec. If empty, all partitions
- * will be overwritten.
- */
-case class OverwriteOptions(
- enabled: Boolean,
- staticPartitionKeys: CatalogTypes.TablePartitionSpec = Map.empty) {
- if (staticPartitionKeys.nonEmpty) {
- assert(enabled, "Overwrite must be enabled when specifying specific partitions.")
- }
-}
-
-/**
* Insert some data into a table.
*
* @param table the logical plan representing the table. In the future this should be a
@@ -382,14 +364,14 @@ case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
- overwrite: OverwriteOptions,
+ overwrite: Boolean,
ifNotExists: Boolean)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty
- assert(overwrite.enabled || !ifNotExists)
+ assert(overwrite || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
override lazy val resolved: Boolean = childrenResolved && table.resolved
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 8e4327c788..f408ba99d1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -180,16 +180,7 @@ class PlanParserSuite extends PlanTest {
partition: Map[String, Option[String]],
overwrite: Boolean = false,
ifNotExists: Boolean = false): LogicalPlan =
- InsertIntoTable(
- table("s"), partition, plan,
- OverwriteOptions(
- overwrite,
- if (overwrite && partition.nonEmpty) {
- partition.map(kv => (kv._1, kv._2.get))
- } else {
- Map.empty
- }),
- ifNotExists)
+ InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
// Single inserts
assertEqual(s"insert overwrite table s $sql",
@@ -205,9 +196,9 @@ class PlanParserSuite extends PlanTest {
val plan2 = table("t").where('x > 5).select(star())
assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
InsertIntoTable(
- table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union(
+ table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union(
InsertIntoTable(
- table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false)))
+ table("u"), Map.empty, plan2, false, ifNotExists = false)))
}
test ("insert with if not exists") {