aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala78
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala167
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala164
3 files changed, 212 insertions, 197 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9c5660a378..405f38ad49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,11 +23,12 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
+import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
/**
@@ -364,7 +365,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
}
- val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
+ val catalog = df.sparkSession.sessionState.catalog
+ val tableExists = catalog.tableExists(tableIdent)
+ val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+ val tableIdentWithDB = tableIdent.copy(database = Some(db))
+ val tableName = tableIdentWithDB.unquotedString
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
@@ -373,39 +378,48 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case (true, SaveMode.ErrorIfExists) =>
throw new AnalysisException(s"Table $tableIdent already exists.")
- case _ =>
- val existingTable = if (tableExists) {
- Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
- } else {
- None
+ case (true, SaveMode.Overwrite) =>
+ // Get all input data source relations of the query.
+ val srcRelations = df.logicalPlan.collect {
+ case LogicalRelation(src: BaseRelation, _, _) => src
}
- val storage = if (tableExists) {
- existingTable.get.storage
- } else {
- DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
- }
- val tableType = if (tableExists) {
- existingTable.get.tableType
- } else if (storage.locationUri.isDefined) {
- CatalogTableType.EXTERNAL
- } else {
- CatalogTableType.MANAGED
+ EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
+ // Only do the check if the table is a data source table (the relation is a BaseRelation).
+ case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
+ throw new AnalysisException(
+ s"Cannot overwrite table $tableName that is also being read from")
+ case _ => // OK
}
- val tableDesc = CatalogTable(
- identifier = tableIdent,
- tableType = tableType,
- storage = storage,
- schema = new StructType,
- provider = Some(source),
- partitionColumnNames = partitioningColumns.getOrElse(Nil),
- bucketSpec = getBucketSpec
- )
- df.sparkSession.sessionState.executePlan(
- CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
+ // Drop the existing table
+ catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
+ createTable(tableIdent)
+
+ case _ => createTable(tableIdent)
}
}
+ private def createTable(tableIdent: TableIdentifier): Unit = {
+ val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+ val tableType = if (storage.locationUri.isDefined) {
+ CatalogTableType.EXTERNAL
+ } else {
+ CatalogTableType.MANAGED
+ }
+
+ val tableDesc = CatalogTable(
+ identifier = tableIdent,
+ tableType = tableType,
+ storage = storage,
+ schema = new StructType,
+ provider = Some(source),
+ partitionColumnNames = partitioningColumns.getOrElse(Nil),
+ bucketSpec = getBucketSpec
+ )
+ df.sparkSession.sessionState.executePlan(
+ CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
+ }
+
/**
* Saves the content of the `DataFrame` to an external database table via JDBC. In the case the
* table already exists in the external database, behavior of this function depends on the
@@ -441,7 +455,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotPartitioned("jdbc")
assertNotBucketed("jdbc")
// connectionProperties should override settings in extraOptions.
- this.extraOptions = this.extraOptions ++ connectionProperties.asScala
+ this.extraOptions ++= connectionProperties.asScala
// explicit url and dbtable should override all
this.extraOptions += ("url" -> url, "dbtable" -> table)
format("jdbc").save()
@@ -588,7 +602,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private var mode: SaveMode = SaveMode.ErrorIfExists
- private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private val extraOptions = new scala.collection.mutable.HashMap[String, String]
private var partitioningColumns: Option[Seq[String]] = None
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 81c20475a3..c64c7ad943 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
@@ -134,139 +133,31 @@ case class CreateDataSourceTableAsSelectCommand(
assert(table.provider.isDefined)
assert(table.schema.isEmpty)
- val provider = table.provider.get
val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
- var createMetastoreTable = false
- // We may need to reorder the columns of the query to match the existing table.
- var reorderedColumns = Option.empty[Seq[NamedExpression]]
- if (sessionState.catalog.tableExists(tableIdentWithDB)) {
- // Check if we need to throw an exception or just return.
- mode match {
- case SaveMode.ErrorIfExists =>
- throw new AnalysisException(s"Table $tableName already exists. " +
- s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
- s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
- s"the existing data. " +
- s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
- case SaveMode.Ignore =>
- // Since the table already exists and the save mode is Ignore, we will just return.
- return Seq.empty[Row]
- case SaveMode.Append =>
- val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+ val result = if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+ assert(mode != SaveMode.Overwrite,
+ s"Expect the table $tableName has been dropped when the save mode is Overwrite")
- if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
- throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
- "not supported yet. Please use the insertInto() API as an alternative.")
- }
-
- // Check if the specified data source match the data source of the existing table.
- val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
- val specifiedProvider = DataSource.lookupDataSource(table.provider.get)
- // TODO: Check that options from the resolved relation match the relation that we are
- // inserting into (i.e. using the same compression).
- if (existingProvider != specifiedProvider) {
- throw new AnalysisException(s"The format of the existing table $tableName is " +
- s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
- s"`${specifiedProvider.getSimpleName}`.")
- }
-
- if (query.schema.length != existingTable.schema.length) {
- throw new AnalysisException(
- s"The column number of the existing table $tableName" +
- s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
- s"(${query.schema.catalogString})")
- }
-
- val resolver = sessionState.conf.resolver
- val tableCols = existingTable.schema.map(_.name)
-
- reorderedColumns = Some(existingTable.schema.map { f =>
- query.resolve(Seq(f.name), resolver).getOrElse {
- val inputColumns = query.schema.map(_.name).mkString(", ")
- throw new AnalysisException(
- s"cannot resolve '${f.name}' given input columns: [$inputColumns]")
- }
- })
-
- // In `AnalyzeCreateTable`, we verified the consistency between the user-specified table
- // definition(partition columns, bucketing) and the SELECT query, here we also need to
- // verify the the consistency between the user-specified table definition and the existing
- // table definition.
-
- // Check if the specified partition columns match the existing table.
- val specifiedPartCols = CatalogUtils.normalizePartCols(
- tableName, tableCols, table.partitionColumnNames, resolver)
- if (specifiedPartCols != existingTable.partitionColumnNames) {
- throw new AnalysisException(
- s"""
- |Specified partitioning does not match that of the existing table $tableName.
- |Specified partition columns: [${specifiedPartCols.mkString(", ")}]
- |Existing partition columns: [${existingTable.partitionColumnNames.mkString(", ")}]
- """.stripMargin)
- }
-
- // Check if the specified bucketing match the existing table.
- val specifiedBucketSpec = table.bucketSpec.map { bucketSpec =>
- CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver)
- }
- if (specifiedBucketSpec != existingTable.bucketSpec) {
- val specifiedBucketString =
- specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
- val existingBucketString =
- existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed")
- throw new AnalysisException(
- s"""
- |Specified bucketing does not match that of the existing table $tableName.
- |Specified bucketing: $specifiedBucketString
- |Existing bucketing: $existingBucketString
- """.stripMargin)
- }
-
- case SaveMode.Overwrite =>
- sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
- // Need to create the table again.
- createMetastoreTable = true
+ if (mode == SaveMode.ErrorIfExists) {
+ throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
+ }
+ if (mode == SaveMode.Ignore) {
+ // Since the table already exists and the save mode is Ignore, we will just return.
+ return Seq.empty
}
- } else {
- // The table does not exist. We need to create it in metastore.
- createMetastoreTable = true
- }
-
- val data = Dataset.ofRows(sparkSession, query)
- val df = reorderedColumns match {
- // Reorder the columns of the query to match the existing table.
- case Some(cols) => data.select(cols.map(Column(_)): _*)
- case None => data
- }
- val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
- Some(sessionState.catalog.defaultTablePath(table.identifier))
+ saveDataIntoTable(sparkSession, table, table.storage.locationUri, query, mode)
} else {
- table.storage.locationUri
- }
-
- // Create the relation based on the data of df.
- val pathOption = tableLocation.map("path" -> _)
- val dataSource = DataSource(
- sparkSession,
- className = provider,
- partitionColumns = table.partitionColumnNames,
- bucketSpec = table.bucketSpec,
- options = table.storage.properties ++ pathOption,
- catalogTable = Some(table))
-
- val result = try {
- dataSource.write(mode, df)
- } catch {
- case ex: AnalysisException =>
- logError(s"Failed to write to table $tableName in $mode mode", ex)
- throw ex
- }
- if (createMetastoreTable) {
+ val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+ Some(sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.locationUri
+ }
+ val result = saveDataIntoTable(sparkSession, table, tableLocation, query, mode)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
@@ -274,6 +165,7 @@ case class CreateDataSourceTableAsSelectCommand(
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
schema = result.schema)
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
+ result
}
result match {
@@ -289,4 +181,29 @@ case class CreateDataSourceTableAsSelectCommand(
sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
}
+
+ private def saveDataIntoTable(
+ session: SparkSession,
+ table: CatalogTable,
+ tableLocation: Option[String],
+ data: LogicalPlan,
+ mode: SaveMode): BaseRelation = {
+ // Create the relation based on the input logical plan: `data`.
+ val pathOption = tableLocation.map("path" -> _)
+ val dataSource = DataSource(
+ session,
+ className = table.provider.get,
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
+ options = table.storage.properties ++ pathOption,
+ catalogTable = Some(table))
+
+ try {
+ dataSource.write(mode, Dataset.ofRows(session, query))
+ } catch {
+ case ex: AnalysisException =>
+ logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
+ throw ex
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 2b2fbddd12..07b16671f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogUtils, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
@@ -86,6 +86,108 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
}
c
+ // When we append data to an existing table, check if the given provider, partition columns,
+ // bucket spec, etc. match the existing table, and adjust the columns order of the given query
+ // if necessary.
+ case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
+ if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) =>
+ // This is guaranteed by the parser and `DataFrameWriter`
+ assert(tableDesc.schema.isEmpty && tableDesc.provider.isDefined)
+
+ // Analyze the query in CTAS and then we can do the normalization and checking.
+ val qe = sparkSession.sessionState.executePlan(query)
+ qe.assertAnalyzed()
+ val analyzedQuery = qe.analyzed
+
+ val catalog = sparkSession.sessionState.catalog
+ val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase)
+ val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db))
+ val tableName = tableIdentWithDB.unquotedString
+ val existingTable = catalog.getTableMetadata(tableIdentWithDB)
+
+ if (existingTable.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException("Saving data into a view is not allowed.")
+ }
+
+ if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
+ throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
+ "not supported yet. Please use the insertInto() API as an alternative.")
+ }
+
+ // Check if the specified data source match the data source of the existing table.
+ val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
+ val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
+ // TODO: Check that options from the resolved relation match the relation that we are
+ // inserting into (i.e. using the same compression).
+ if (existingProvider != specifiedProvider) {
+ throw new AnalysisException(s"The format of the existing table $tableName is " +
+ s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
+ s"`${specifiedProvider.getSimpleName}`.")
+ }
+
+ if (analyzedQuery.schema.length != existingTable.schema.length) {
+ throw new AnalysisException(
+ s"The column number of the existing table $tableName" +
+ s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
+ s"(${query.schema.catalogString})")
+ }
+
+ val resolver = sparkSession.sessionState.conf.resolver
+ val tableCols = existingTable.schema.map(_.name)
+
+ // As we are inserting into an existing table, we should respect the existing schema and
+ // adjust the column order of the given dataframe according to it, or throw exception
+ // if the column names do not match.
+ val adjustedColumns = tableCols.map { col =>
+ analyzedQuery.resolve(Seq(col), resolver).getOrElse {
+ val inputColumns = analyzedQuery.schema.map(_.name).mkString(", ")
+ throw new AnalysisException(
+ s"cannot resolve '$col' given input columns: [$inputColumns]")
+ }
+ }
+
+ // Check if the specified partition columns match the existing table.
+ val specifiedPartCols = CatalogUtils.normalizePartCols(
+ tableName, tableCols, tableDesc.partitionColumnNames, resolver)
+ if (specifiedPartCols != existingTable.partitionColumnNames) {
+ val existingPartCols = existingTable.partitionColumnNames.mkString(", ")
+ throw new AnalysisException(
+ s"""
+ |Specified partitioning does not match that of the existing table $tableName.
+ |Specified partition columns: [${specifiedPartCols.mkString(", ")}]
+ |Existing partition columns: [$existingPartCols]
+ """.stripMargin)
+ }
+
+ // Check if the specified bucketing match the existing table.
+ val specifiedBucketSpec = tableDesc.bucketSpec.map { bucketSpec =>
+ CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver)
+ }
+ if (specifiedBucketSpec != existingTable.bucketSpec) {
+ val specifiedBucketString =
+ specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
+ val existingBucketString =
+ existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed")
+ throw new AnalysisException(
+ s"""
+ |Specified bucketing does not match that of the existing table $tableName.
+ |Specified bucketing: $specifiedBucketString
+ |Existing bucketing: $existingBucketString
+ """.stripMargin)
+ }
+
+ val newQuery = if (adjustedColumns != analyzedQuery.output) {
+ Project(adjustedColumns, analyzedQuery)
+ } else {
+ analyzedQuery
+ }
+
+ c.copy(
+ // trust everything from the existing table, except schema as we assume it's empty in a lot
+ // of places, when we do CTAS.
+ tableDesc = existingTable.copy(schema = new StructType()),
+ query = Some(newQuery))
+
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
// config, and do various checks:
// * column names in table definition can't be duplicated.
@@ -94,7 +196,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
// * can't use all table columns as partition columns.
// * partition columns' type must be AtomicType.
// * sort columns' type must be orderable.
- case c @ CreateTable(tableDesc, mode, query) =>
+ case c @ CreateTable(tableDesc, _, query) =>
val analyzedQuery = query.map { q =>
// Analyze the query in CTAS and then we can do the normalization and checking.
val qe = sparkSession.sessionState.executePlan(q)
@@ -106,6 +208,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
} else {
tableDesc.schema
}
+
val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
schema.map(_.name)
} else {
@@ -113,22 +216,24 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
}
checkDuplication(columnNames, "table definition of " + tableDesc.identifier)
- val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
- val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked)
- c.copy(tableDesc = bucketColsChecked, query = analyzedQuery)
+ val normalizedTable = tableDesc.copy(
+ partitionColumnNames = normalizePartitionColumns(schema, tableDesc),
+ bucketSpec = normalizeBucketSpec(schema, tableDesc))
+
+ c.copy(tableDesc = normalizedTable, query = analyzedQuery)
}
- private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
+ private def normalizePartitionColumns(schema: StructType, table: CatalogTable): Seq[String] = {
val normalizedPartitionCols = CatalogUtils.normalizePartCols(
- tableName = tableDesc.identifier.unquotedString,
+ tableName = table.identifier.unquotedString,
tableCols = schema.map(_.name),
- partCols = tableDesc.partitionColumnNames,
+ partCols = table.partitionColumnNames,
resolver = sparkSession.sessionState.conf.resolver)
checkDuplication(normalizedPartitionCols, "partition")
if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
- if (tableDesc.provider.get == DDLUtils.HIVE_PROVIDER) {
+ if (table.provider.get == DDLUtils.HIVE_PROVIDER) {
// When we hit this branch, it means users didn't specify schema for the table to be
// created, as we always include partition columns in table schema for hive serde tables.
// The real schema will be inferred at hive metastore by hive serde, plus the given
@@ -144,28 +249,28 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column")
}
- tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+ normalizedPartitionCols
}
- private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
- tableDesc.bucketSpec match {
+ private def normalizeBucketSpec(schema: StructType, table: CatalogTable): Option[BucketSpec] = {
+ table.bucketSpec match {
case Some(bucketSpec) =>
- val normalizedBucketing = CatalogUtils.normalizeBucketSpec(
- tableName = tableDesc.identifier.unquotedString,
+ val normalizedBucketSpec = CatalogUtils.normalizeBucketSpec(
+ tableName = table.identifier.unquotedString,
tableCols = schema.map(_.name),
bucketSpec = bucketSpec,
resolver = sparkSession.sessionState.conf.resolver)
- checkDuplication(normalizedBucketing.bucketColumnNames, "bucket")
- checkDuplication(normalizedBucketing.sortColumnNames, "sort")
+ checkDuplication(normalizedBucketSpec.bucketColumnNames, "bucket")
+ checkDuplication(normalizedBucketSpec.sortColumnNames, "sort")
- normalizedBucketing.sortColumnNames.map(schema(_)).map(_.dataType).foreach {
+ normalizedBucketSpec.sortColumnNames.map(schema(_)).map(_.dataType).foreach {
case dt if RowOrdering.isOrderable(dt) => // OK
case other => failAnalysis(s"Cannot use ${other.simpleString} for sorting column")
}
- tableDesc.copy(bucketSpec = Some(normalizedBucketing))
+ Some(normalizedBucketSpec)
- case None => tableDesc
+ case None => None
}
}
@@ -294,27 +399,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
- case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
- if (query.isDefined &&
- mode == SaveMode.Overwrite &&
- catalog.tableExists(tableDesc.identifier)) {
- // Need to remove SubQuery operator.
- EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
- // Only do the check if the table is a data source table
- // (the relation is a BaseRelation).
- case LogicalRelation(dest: BaseRelation, _, _) =>
- // Get all input data source relations of the query.
- val srcRelations = query.get.collect {
- case LogicalRelation(src: BaseRelation, _, _) => src
- }
- if (srcRelations.contains(dest)) {
- failAnalysis(
- s"Cannot overwrite table ${tableDesc.identifier} that is also being read from")
- }
- case _ => // OK
- }
- }
-
case logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) =>
// Right now, we do not support insert into a data source table with partition specs.