aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-12-14 11:30:34 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-14 11:30:34 +0800
commit3e307b4959ecdab3f9c16484d172403357e7d09b (patch)
tree231652eec3b96d4a0472eec0b6e1c2f963729bd7
parentf2ddabfa09fda26ff0391d026dd67545dab33e01 (diff)
downloadspark-3e307b4959ecdab3f9c16484d172403357e7d09b.tar.gz
spark-3e307b4959ecdab3f9c16484d172403357e7d09b.tar.bz2
spark-3e307b4959ecdab3f9c16484d172403357e7d09b.zip
[SPARK-18566][SQL] remove OverwriteOptions
## What changes were proposed in this pull request? `OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #15995 from cloud-fan/overwrite.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala22
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala127
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala5
11 files changed, 91 insertions, 115 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index e901683be6..66e52ca68a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
analysis.UnresolvedRelation(TableIdentifier(tableName)),
- Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
+ Map.empty, logicalPlan, overwrite, false)
def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 7b8badcf8c..3969fdb0ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,15 +177,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
}
- val overwrite = ctx.OVERWRITE != null
- val staticPartitionKeys: Map[String, String] =
- partitionKeys.filter(_._2.nonEmpty).map(t => (t._1, t._2.get))
InsertIntoTable(
UnresolvedRelation(tableIdent, None),
partitionKeys,
query,
- OverwriteOptions(overwrite, if (overwrite) staticPartitionKeys else Map.empty),
+ ctx.OVERWRITE != null,
ctx.EXISTS != null)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index c210b74e8a..b9bdd53dd1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
@@ -347,22 +345,6 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
}
/**
- * Options for writing new data into a table.
- *
- * @param enabled whether to overwrite existing data in the table.
- * @param staticPartitionKeys if non-empty, specifies that we only want to overwrite partitions
- * that match this partial partition spec. If empty, all partitions
- * will be overwritten.
- */
-case class OverwriteOptions(
- enabled: Boolean,
- staticPartitionKeys: CatalogTypes.TablePartitionSpec = Map.empty) {
- if (staticPartitionKeys.nonEmpty) {
- assert(enabled, "Overwrite must be enabled when specifying specific partitions.")
- }
-}
-
-/**
* Insert some data into a table.
*
* @param table the logical plan representing the table. In the future this should be a
@@ -382,14 +364,14 @@ case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
- overwrite: OverwriteOptions,
+ overwrite: Boolean,
ifNotExists: Boolean)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty
- assert(overwrite.enabled || !ifNotExists)
+ assert(overwrite || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
override lazy val resolved: Boolean = childrenResolved && table.resolved
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 8e4327c788..f408ba99d1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -180,16 +180,7 @@ class PlanParserSuite extends PlanTest {
partition: Map[String, Option[String]],
overwrite: Boolean = false,
ifNotExists: Boolean = false): LogicalPlan =
- InsertIntoTable(
- table("s"), partition, plan,
- OverwriteOptions(
- overwrite,
- if (overwrite && partition.nonEmpty) {
- partition.map(kv => (kv._1, kv._2.get))
- } else {
- Map.empty
- }),
- ifNotExists)
+ InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
// Single inserts
assertEqual(s"insert overwrite table s $sql",
@@ -205,9 +196,9 @@ class PlanParserSuite extends PlanTest {
val plan2 = table("t").where('x > 5).select(star())
assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
InsertIntoTable(
- table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union(
+ table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union(
InsertIntoTable(
- table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false)))
+ table("u"), Map.empty, plan2, false, ifNotExists = false)))
}
test ("insert with if not exists") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index fa8e8cb985..9d28e2704b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions}
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.types.StructType
@@ -259,7 +259,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
child = df.logicalPlan,
- overwrite = OverwriteOptions(mode == SaveMode.Overwrite),
+ overwrite = mode == SaveMode.Overwrite,
ifNotExists = false)).toRdd
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index eaccdf27da..9250d0e237 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -478,7 +478,7 @@ case class DataSource(
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
- staticPartitionKeys = Map.empty,
+ staticPartitions = Map.empty,
customPartitionLocations = Map.empty,
partitionColumns = columns,
bucketSpec = bucketSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 03eed25176..61f0d43f24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
@@ -32,8 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
@@ -100,7 +99,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
None
} else if (potentialSpecs.size == 1) {
val partValue = potentialSpecs.head._2
- Some(Alias(Cast(Literal(partValue), field.dataType), "_staticPart")())
+ Some(Alias(Cast(Literal(partValue), field.dataType), field.name)())
} else {
throw new AnalysisException(
s"Partition column ${field.name} have multiple values specified, " +
@@ -128,61 +127,75 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
projectList
}
+ /**
+ * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule
+ * [[PreprocessTableInsertion]]. It is important that this rule([[DataSourceAnalysis]]) has to
+ * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and
+ * fix the schema mismatch by adding Cast.
+ */
+ private def hasBeenPreprocessed(
+ tableOutput: Seq[Attribute],
+ partSchema: StructType,
+ partSpec: Map[String, Option[String]],
+ query: LogicalPlan): Boolean = {
+ val partColNames = partSchema.map(_.name).toSet
+ query.resolved && partSpec.keys.forall(partColNames.contains) && {
+ val staticPartCols = partSpec.filter(_._2.isDefined).keySet
+ val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name))
+ expectedColumns.toStructType.sameType(query.schema)
+ }
+ }
+
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- // If the InsertIntoTable command is for a partitioned HadoopFsRelation and
- // the user has specified static partitions, we add a Project operator on top of the query
- // to include those constant column values in the query result.
- //
- // Example:
- // Let's say that we have a table "t", which is created by
- // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)
- // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1, 3"
- // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 1, 2, 3".
- //
- // Basically, we will put those partition columns having a assigned value back
- // to the SELECT clause. The output of the SELECT clause is organized as
- // normal_columns static_partitioning_columns dynamic_partitioning_columns.
- // static_partitioning_columns are partitioning columns having assigned
- // values in the PARTITION clause (e.g. b in the above example).
- // dynamic_partitioning_columns are partitioning columns that do not assigned
- // values in the PARTITION clause (e.g. c in the above example).
- case insert @ logical.InsertIntoTable(
- relation @ LogicalRelation(t: HadoopFsRelation, _, _), parts, query, overwrite, false)
- if query.resolved && parts.exists(_._2.isDefined) =>
-
- val projectList = convertStaticPartitions(
- sourceAttributes = query.output,
- providedPartitions = parts,
- targetAttributes = relation.output,
- targetPartitionSchema = t.partitionSchema)
-
- // We will remove all assigned values to static partitions because they have been
- // moved to the projectList.
- insert.copy(partition = parts.map(p => (p._1, None)), child = Project(projectList, query))
-
-
- case logical.InsertIntoTable(
- l @ LogicalRelation(t: HadoopFsRelation, _, table), _, query, overwrite, false)
- if query.resolved && t.schema.sameType(query.schema) =>
-
- // Sanity checks
+ case InsertIntoTable(
+ l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false)
+ if hasBeenPreprocessed(l.output, t.partitionSchema, parts, query) =>
+
+ // If the InsertIntoTable command is for a partitioned HadoopFsRelation and
+ // the user has specified static partitions, we add a Project operator on top of the query
+ // to include those constant column values in the query result.
+ //
+ // Example:
+ // Let's say that we have a table "t", which is created by
+ // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)
+ // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1, 3"
+ // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 1, 2, 3".
+ //
+ // Basically, we will put those partition columns having a assigned value back
+ // to the SELECT clause. The output of the SELECT clause is organized as
+ // normal_columns static_partitioning_columns dynamic_partitioning_columns.
+ // static_partitioning_columns are partitioning columns having assigned
+ // values in the PARTITION clause (e.g. b in the above example).
+ // dynamic_partitioning_columns are partitioning columns that do not assigned
+ // values in the PARTITION clause (e.g. c in the above example).
+ val actualQuery = if (parts.exists(_._2.isDefined)) {
+ val projectList = convertStaticPartitions(
+ sourceAttributes = query.output,
+ providedPartitions = parts,
+ targetAttributes = l.output,
+ targetPartitionSchema = t.partitionSchema)
+ Project(projectList, query)
+ } else {
+ query
+ }
+
+ // Sanity check
if (t.location.rootPaths.size != 1) {
- throw new AnalysisException(
- "Can only write data to relations with a single path.")
+ throw new AnalysisException("Can only write data to relations with a single path.")
}
val outputPath = t.location.rootPaths.head
- val inputPaths = query.collect {
+ val inputPaths = actualQuery.collect {
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
}.flatten
- val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append
- if (overwrite.enabled && inputPaths.contains(outputPath)) {
+ val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
+ if (overwrite && inputPaths.contains(outputPath)) {
throw new AnalysisException(
"Cannot overwrite a path that is also being read from.")
}
- val partitionSchema = query.resolve(
+ val partitionSchema = actualQuery.resolve(
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
val partitionsTrackedByCatalog =
t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
@@ -192,19 +205,13 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
- val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
- overwrite.staticPartitionKeys.map { case (k, v) =>
- (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
- }
- } else {
- Map.empty
- }
+ val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
// When partitions are tracked by the catalog, compute all custom partition locations that
// may be relevant to the insertion job.
if (partitionsTrackedByCatalog) {
val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions(
- l.catalogTable.get.identifier, Some(staticPartitionKeys))
+ l.catalogTable.get.identifier, Some(staticPartitions))
initialMatchingPartitions = matchingPartitions.map(_.spec)
customPartitionLocations = getCustomPartitionLocations(
t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions)
@@ -220,7 +227,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
l.catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
ifNotExists = true).run(t.sparkSession)
}
- if (overwrite.enabled) {
+ if (overwrite) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
@@ -235,14 +242,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
val insertCmd = InsertIntoHadoopFsRelationCommand(
outputPath,
- staticPartitionKeys,
+ staticPartitions,
customPartitionLocations,
partitionSchema,
t.bucketSpec,
t.fileFormat,
refreshPartitionsCallback,
t.options,
- query,
+ actualQuery,
mode,
table)
@@ -305,7 +312,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
+ case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
if DDLUtils.isDatasourceTable(s.metadata) =>
i.copy(table = readDataSourceTable(sparkSession, s))
@@ -351,7 +358,7 @@ object DataSourceStrategy extends Strategy with Logging {
Map.empty,
None) :: Nil
- case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
+ case InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
part, query, overwrite, false) if part.isEmpty =>
ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 2eba1e9986..b2ff68a833 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OverwriteOptions}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.sources.InsertableRelation
@@ -30,7 +30,7 @@ import org.apache.spark.sql.sources.InsertableRelation
case class InsertIntoDataSourceCommand(
logicalRelation: LogicalRelation,
query: LogicalPlan,
- overwrite: OverwriteOptions)
+ overwrite: Boolean)
extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
@@ -40,7 +40,7 @@ case class InsertIntoDataSourceCommand(
val data = Dataset.ofRows(sparkSession, query)
// Apply the schema of the existing table to the new data.
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
- relation.insert(df, overwrite.enabled)
+ relation.insert(df, overwrite)
// Invalidate the cache.
sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index a9bde903b3..53c884c22b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.execution.command.RunnableCommand
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported.
*
- * @param staticPartitionKeys partial partitioning spec for write. This defines the scope of
- * partition overwrites: when the spec is empty, all partitions are
- * overwritten. When it covers a prefix of the partition keys, only
- * partitions matching the prefix are overwritten.
+ * @param staticPartitions partial partitioning spec for write. This defines the scope of partition
+ * overwrites: when the spec is empty, all partitions are overwritten.
+ * When it covers a prefix of the partition keys, only partitions matching
+ * the prefix are overwritten.
* @param customPartitionLocations mapping of partition specs to their custom locations. The
* caller should guarantee that exactly those table partitions
* falling under the specified static partition keys are contained
@@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
- staticPartitionKeys: TablePartitionSpec,
+ staticPartitions: TablePartitionSpec,
customPartitionLocations: Map[TablePartitionSpec, String],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
@@ -122,9 +122,9 @@ case class InsertIntoHadoopFsRelationCommand(
* locations are also cleared based on the custom locations map given to this class.
*/
private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = {
- val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) {
+ val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
"/" + partitionColumns.flatMap { p =>
- staticPartitionKeys.get(p.name) match {
+ staticPartitions.get(p.name) match {
case Some(value) =>
Some(escapePathName(p.name) + "=" + escapePathName(value))
case None =>
@@ -143,7 +143,7 @@ case class InsertIntoHadoopFsRelationCommand(
// now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
for ((spec, customLoc) <- customPartitionLocations) {
assert(
- (staticPartitionKeys.toSet -- spec).isEmpty,
+ (staticPartitions.toSet -- spec).isEmpty,
"Custom partition location did not match static partitioning keys")
val path = new Path(customLoc)
if (fs.exists(path) && !fs.delete(path, true)) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index ce1e3eb1a5..773c4a39d8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -47,7 +47,7 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(
table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoHiveTable(
- table, partition, planLater(child), overwrite.enabled, ifNotExists) :: Nil
+ table, partition, planLater(child), overwrite, ifNotExists) :: Nil
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index cac43597ae..ef5a5a001f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, OverwriteOptions}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.MetastoreRelation
@@ -88,8 +88,7 @@ case class CreateHiveTableAsSelectCommand(
} else {
try {
sparkSession.sessionState.executePlan(InsertIntoTable(
- metastoreRelation, Map(), query, overwrite = OverwriteOptions(true),
- ifNotExists = false)).toRdd
+ metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
} catch {
case NonFatal(e) =>
// drop the created table.