diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-12-14 11:30:34 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-12-14 11:30:34 +0800 |
commit | 3e307b4959ecdab3f9c16484d172403357e7d09b (patch) | |
tree | 231652eec3b96d4a0472eec0b6e1c2f963729bd7 | |
parent | f2ddabfa09fda26ff0391d026dd67545dab33e01 (diff) | |
download | spark-3e307b4959ecdab3f9c16484d172403357e7d09b.tar.gz spark-3e307b4959ecdab3f9c16484d172403357e7d09b.tar.bz2 spark-3e307b4959ecdab3f9c16484d172403357e7d09b.zip |
[SPARK-18566][SQL] remove OverwriteOptions
## What changes were proposed in this pull request?
`OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes #15995 from cloud-fan/overwrite.
11 files changed, 91 insertions, 115 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index e901683be6..66e52ca68a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -367,7 +367,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan = InsertIntoTable( analysis.UnresolvedRelation(TableIdentifier(tableName)), - Map.empty, logicalPlan, OverwriteOptions(overwrite), false) + Map.empty, logicalPlan, overwrite, false) def as(alias: String): LogicalPlan = logicalPlan match { case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7b8badcf8c..3969fdb0ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -177,15 +177,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " + "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } - val overwrite = ctx.OVERWRITE != null - val staticPartitionKeys: Map[String, String] = - partitionKeys.filter(_._2.nonEmpty).map(t => (t._1, t._2.get)) InsertIntoTable( UnresolvedRelation(tableIdent, None), partitionKeys, query, - OverwriteOptions(overwrite, if (overwrite) staticPartitionKeys else Map.empty), + ctx.OVERWRITE != null, ctx.EXISTS != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c210b74e8a..b9bdd53dd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog.CatalogTypes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -347,22 +345,6 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { } /** - * Options for writing new data into a table. - * - * @param enabled whether to overwrite existing data in the table. - * @param staticPartitionKeys if non-empty, specifies that we only want to overwrite partitions - * that match this partial partition spec. If empty, all partitions - * will be overwritten. - */ -case class OverwriteOptions( - enabled: Boolean, - staticPartitionKeys: CatalogTypes.TablePartitionSpec = Map.empty) { - if (staticPartitionKeys.nonEmpty) { - assert(enabled, "Overwrite must be enabled when specifying specific partitions.") - } -} - -/** * Insert some data into a table. * * @param table the logical plan representing the table. In the future this should be a @@ -382,14 +364,14 @@ case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], child: LogicalPlan, - overwrite: OverwriteOptions, + overwrite: Boolean, ifNotExists: Boolean) extends LogicalPlan { override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = Seq.empty - assert(overwrite.enabled || !ifNotExists) + assert(overwrite || !ifNotExists) assert(partition.values.forall(_.nonEmpty) || !ifNotExists) override lazy val resolved: Boolean = childrenResolved && table.resolved diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 8e4327c788..f408ba99d1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -180,16 +180,7 @@ class PlanParserSuite extends PlanTest { partition: Map[String, Option[String]], overwrite: Boolean = false, ifNotExists: Boolean = false): LogicalPlan = - InsertIntoTable( - table("s"), partition, plan, - OverwriteOptions( - overwrite, - if (overwrite && partition.nonEmpty) { - partition.map(kv => (kv._1, kv._2.get)) - } else { - Map.empty - }), - ifNotExists) + InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists) // Single inserts assertEqual(s"insert overwrite table s $sql", @@ -205,9 +196,9 @@ class PlanParserSuite extends PlanTest { val plan2 = table("t").where('x > 5).select(star()) assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5", InsertIntoTable( - table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union( + table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union( InsertIntoTable( - table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false))) + table("u"), Map.empty, plan2, false, ifNotExists = false))) } test ("insert with if not exists") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index fa8e8cb985..9d28e2704b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions} +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation} import org.apache.spark.sql.types.StructType @@ -259,7 +259,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], child = df.logicalPlan, - overwrite = OverwriteOptions(mode == SaveMode.Overwrite), + overwrite = mode == SaveMode.Overwrite, ifNotExists = false)).toRdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index eaccdf27da..9250d0e237 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -478,7 +478,7 @@ case class DataSource( val plan = InsertIntoHadoopFsRelationCommand( outputPath = outputPath, - staticPartitionKeys = Map.empty, + staticPartitions = Map.empty, customPartitionLocations = Map.empty, partitionColumns = columns, bucketSpec = bucketSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 03eed25176..61f0d43f24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation} @@ -32,8 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} @@ -100,7 +99,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { None } else if (potentialSpecs.size == 1) { val partValue = potentialSpecs.head._2 - Some(Alias(Cast(Literal(partValue), field.dataType), "_staticPart")()) + Some(Alias(Cast(Literal(partValue), field.dataType), field.name)()) } else { throw new AnalysisException( s"Partition column ${field.name} have multiple values specified, " + @@ -128,61 +127,75 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { projectList } + /** + * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule + * [[PreprocessTableInsertion]]. It is important that this rule([[DataSourceAnalysis]]) has to + * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and + * fix the schema mismatch by adding Cast. + */ + private def hasBeenPreprocessed( + tableOutput: Seq[Attribute], + partSchema: StructType, + partSpec: Map[String, Option[String]], + query: LogicalPlan): Boolean = { + val partColNames = partSchema.map(_.name).toSet + query.resolved && partSpec.keys.forall(partColNames.contains) && { + val staticPartCols = partSpec.filter(_._2.isDefined).keySet + val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name)) + expectedColumns.toStructType.sameType(query.schema) + } + } + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // If the InsertIntoTable command is for a partitioned HadoopFsRelation and - // the user has specified static partitions, we add a Project operator on top of the query - // to include those constant column values in the query result. - // - // Example: - // Let's say that we have a table "t", which is created by - // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c) - // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1, 3" - // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 1, 2, 3". - // - // Basically, we will put those partition columns having a assigned value back - // to the SELECT clause. The output of the SELECT clause is organized as - // normal_columns static_partitioning_columns dynamic_partitioning_columns. - // static_partitioning_columns are partitioning columns having assigned - // values in the PARTITION clause (e.g. b in the above example). - // dynamic_partitioning_columns are partitioning columns that do not assigned - // values in the PARTITION clause (e.g. c in the above example). - case insert @ logical.InsertIntoTable( - relation @ LogicalRelation(t: HadoopFsRelation, _, _), parts, query, overwrite, false) - if query.resolved && parts.exists(_._2.isDefined) => - - val projectList = convertStaticPartitions( - sourceAttributes = query.output, - providedPartitions = parts, - targetAttributes = relation.output, - targetPartitionSchema = t.partitionSchema) - - // We will remove all assigned values to static partitions because they have been - // moved to the projectList. - insert.copy(partition = parts.map(p => (p._1, None)), child = Project(projectList, query)) - - - case logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, table), _, query, overwrite, false) - if query.resolved && t.schema.sameType(query.schema) => - - // Sanity checks + case InsertIntoTable( + l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) + if hasBeenPreprocessed(l.output, t.partitionSchema, parts, query) => + + // If the InsertIntoTable command is for a partitioned HadoopFsRelation and + // the user has specified static partitions, we add a Project operator on top of the query + // to include those constant column values in the query result. + // + // Example: + // Let's say that we have a table "t", which is created by + // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c) + // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1, 3" + // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 1, 2, 3". + // + // Basically, we will put those partition columns having a assigned value back + // to the SELECT clause. The output of the SELECT clause is organized as + // normal_columns static_partitioning_columns dynamic_partitioning_columns. + // static_partitioning_columns are partitioning columns having assigned + // values in the PARTITION clause (e.g. b in the above example). + // dynamic_partitioning_columns are partitioning columns that do not assigned + // values in the PARTITION clause (e.g. c in the above example). + val actualQuery = if (parts.exists(_._2.isDefined)) { + val projectList = convertStaticPartitions( + sourceAttributes = query.output, + providedPartitions = parts, + targetAttributes = l.output, + targetPartitionSchema = t.partitionSchema) + Project(projectList, query) + } else { + query + } + + // Sanity check if (t.location.rootPaths.size != 1) { - throw new AnalysisException( - "Can only write data to relations with a single path.") + throw new AnalysisException("Can only write data to relations with a single path.") } val outputPath = t.location.rootPaths.head - val inputPaths = query.collect { + val inputPaths = actualQuery.collect { case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths }.flatten - val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append - if (overwrite.enabled && inputPaths.contains(outputPath)) { + val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append + if (overwrite && inputPaths.contains(outputPath)) { throw new AnalysisException( "Cannot overwrite a path that is also being read from.") } - val partitionSchema = query.resolve( + val partitionSchema = actualQuery.resolve( t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) val partitionsTrackedByCatalog = t.sparkSession.sessionState.conf.manageFilesourcePartitions && @@ -192,19 +205,13 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty - val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) { - overwrite.staticPartitionKeys.map { case (k, v) => - (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v) - } - } else { - Map.empty - } + val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get } // When partitions are tracked by the catalog, compute all custom partition locations that // may be relevant to the insertion job. if (partitionsTrackedByCatalog) { val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions( - l.catalogTable.get.identifier, Some(staticPartitionKeys)) + l.catalogTable.get.identifier, Some(staticPartitions)) initialMatchingPartitions = matchingPartitions.map(_.spec) customPartitionLocations = getCustomPartitionLocations( t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions) @@ -220,7 +227,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { l.catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)), ifNotExists = true).run(t.sparkSession) } - if (overwrite.enabled) { + if (overwrite) { val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( @@ -235,14 +242,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { val insertCmd = InsertIntoHadoopFsRelationCommand( outputPath, - staticPartitionKeys, + staticPartitions, customPartitionLocations, partitionSchema, t.bucketSpec, t.fileFormat, refreshPartitionsCallback, t.options, - query, + actualQuery, mode, table) @@ -305,7 +312,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) + case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) if DDLUtils.isDatasourceTable(s.metadata) => i.copy(table = readDataSourceTable(sparkSession, s)) @@ -351,7 +358,7 @@ object DataSourceStrategy extends Strategy with Logging { Map.empty, None) :: Nil - case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), + case InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), part, query, overwrite, false) if part.isEmpty => ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 2eba1e9986..b2ff68a833 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OverwriteOptions} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.sources.InsertableRelation @@ -30,7 +30,7 @@ import org.apache.spark.sql.sources.InsertableRelation case class InsertIntoDataSourceCommand( logicalRelation: LogicalRelation, query: LogicalPlan, - overwrite: OverwriteOptions) + overwrite: Boolean) extends RunnableCommand { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) @@ -40,7 +40,7 @@ case class InsertIntoDataSourceCommand( val data = Dataset.ofRows(sparkSession, query) // Apply the schema of the existing table to the new data. val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) - relation.insert(df, overwrite.enabled) + relation.insert(df, overwrite) // Invalidate the cache. sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index a9bde903b3..53c884c22b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -33,10 +33,10 @@ import org.apache.spark.sql.execution.command.RunnableCommand * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. * Writing to dynamic partitions is also supported. * - * @param staticPartitionKeys partial partitioning spec for write. This defines the scope of - * partition overwrites: when the spec is empty, all partitions are - * overwritten. When it covers a prefix of the partition keys, only - * partitions matching the prefix are overwritten. + * @param staticPartitions partial partitioning spec for write. This defines the scope of partition + * overwrites: when the spec is empty, all partitions are overwritten. + * When it covers a prefix of the partition keys, only partitions matching + * the prefix are overwritten. * @param customPartitionLocations mapping of partition specs to their custom locations. The * caller should guarantee that exactly those table partitions * falling under the specified static partition keys are contained @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand */ case class InsertIntoHadoopFsRelationCommand( outputPath: Path, - staticPartitionKeys: TablePartitionSpec, + staticPartitions: TablePartitionSpec, customPartitionLocations: Map[TablePartitionSpec, String], partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], @@ -122,9 +122,9 @@ case class InsertIntoHadoopFsRelationCommand( * locations are also cleared based on the custom locations map given to this class. */ private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = { - val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) { + val staticPartitionPrefix = if (staticPartitions.nonEmpty) { "/" + partitionColumns.flatMap { p => - staticPartitionKeys.get(p.name) match { + staticPartitions.get(p.name) match { case Some(value) => Some(escapePathName(p.name) + "=" + escapePathName(value)) case None => @@ -143,7 +143,7 @@ case class InsertIntoHadoopFsRelationCommand( // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4) for ((spec, customLoc) <- customPartitionLocations) { assert( - (staticPartitionKeys.toSet -- spec).isEmpty, + (staticPartitions.toSet -- spec).isEmpty, "Custom partition location did not match static partitioning keys") val path = new Path(customLoc) if (fs.exists(path) && !fs.delete(path, true)) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ce1e3eb1a5..773c4a39d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -47,7 +47,7 @@ private[hive] trait HiveStrategies { case logical.InsertIntoTable( table: MetastoreRelation, partition, child, overwrite, ifNotExists) => InsertIntoHiveTable( - table, partition, planLater(child), overwrite.enabled, ifNotExists) :: Nil + table, partition, planLater(child), overwrite, ifNotExists) :: Nil case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => val newTableDesc = if (tableDesc.storage.serde.isEmpty) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index cac43597ae..ef5a5a001f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, OverwriteOptions} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.MetastoreRelation @@ -88,8 +88,7 @@ case class CreateHiveTableAsSelectCommand( } else { try { sparkSession.sessionState.executePlan(InsertIntoTable( - metastoreRelation, Map(), query, overwrite = OverwriteOptions(true), - ifNotExists = false)).toRdd + metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd } catch { case NonFatal(e) => // drop the created table. |