aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala17
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala100
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala59
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala49
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala170
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala46
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala151
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala47
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala29
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala7
17 files changed, 417 insertions, 372 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 38f0bc2c4f..f7762e0f8a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,8 +21,7 @@ import java.util.Date
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
@@ -112,6 +111,8 @@ case class BucketSpec(
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*
+ * @param provider the name of the data source provider for this table, e.g. parquet, json, etc.
+ * Can be None if this table is a View, should be "hive" for hive serde tables.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
*/
@@ -120,6 +121,7 @@ case class CatalogTable(
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType,
+ provider: Option[String] = None,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
owner: String = "",
@@ -131,16 +133,6 @@ case class CatalogTable(
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty) {
- // Verify that the provided columns are part of the schema
- private val colNames = schema.map(_.name).toSet
- private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
- require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
- s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
- }
- requireSubsetOfSchema(partitionColumnNames, "partition")
- requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
- requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")
-
/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
c => partitionColumnNames.contains(c.name)
@@ -189,6 +181,7 @@ case class CatalogTable(
s"Last Access: ${new Date(lastAccessTime).toString}",
s"Type: ${tableType.name}",
if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "",
+ if (provider.isDefined) s"Provider: ${provider.get}" else "",
if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else ""
) ++ bucketStrings ++ Seq(
viewOriginalText.map("Original View: " + _).getOrElse(""),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 201d39a364..54365fd978 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -552,7 +552,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
- schema = new StructType().add("a", "int").add("b", "string")
+ schema = new StructType().add("a", "int").add("b", "string"),
+ provider = Some("hive")
)
catalog.createTable(table, ignoreIfExists = false)
@@ -571,7 +572,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
storage = CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
None, None, None, false, Map.empty),
- schema = new StructType().add("a", "int").add("b", "string")
+ schema = new StructType().add("a", "int").add("b", "string"),
+ provider = Some("hive")
)
catalog.createTable(externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
@@ -589,6 +591,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
.add("col2", "string")
.add("a", "int")
.add("b", "string"),
+ provider = Some("hive"),
partitionColumnNames = Seq("a", "b")
)
catalog.createTable(table, ignoreIfExists = false)
@@ -692,6 +695,7 @@ abstract class CatalogTestUtils {
.add("col2", "string")
.add("a", "int")
.add("b", "string"),
+ provider = Some("hive"),
partitionColumnNames = Seq("a", "b"),
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 44189881dd..6dbed26b0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,10 +23,11 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
-import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.types.StructType
/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
@@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")
case _ =>
- val cmd =
- CreateTableUsingAsSelect(
- tableIdent,
- source,
- partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
- getBucketSpec,
- mode,
- extraOptions.toMap,
- df.logicalPlan)
+ val tableDesc = CatalogTable(
+ identifier = tableIdent,
+ tableType = CatalogTableType.EXTERNAL,
+ storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
+ schema = new StructType,
+ provider = Some(source),
+ partitionColumnNames = partitioningColumns.getOrElse(Nil),
+ bucketSpec = getBucketSpec
+ )
+ val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 263ee33742..9eef5cc5fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -24,7 +24,6 @@ import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
-import com.fasterxml.jackson.core.JsonFactory
import org.apache.commons.lang3.StringUtils
import org.apache.spark.annotation.{DeveloperApi, Experimental}
@@ -35,18 +34,16 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
-import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
@@ -174,8 +171,7 @@ class Dataset[T] private[sql](
@transient private[sql] val logicalPlan: LogicalPlan = {
def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
case _: Command |
- _: InsertIntoTable |
- _: CreateTableUsingAsSelect => true
+ _: InsertIntoTable => true
case _ => false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 22b1e07219..2bb686254c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
+import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.{DataType, StructType}
@@ -310,7 +310,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan.
+ * Create a [[CreateTable]] logical plan.
*/
override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
@@ -319,12 +319,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
+ val schema = Option(ctx.colTypeList()).map(createStructType)
val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
+ val tableDesc = CatalogTable(
+ identifier = table,
+ // TODO: actually the table type may be EXTERNAL if we have `path` in options. However, the
+ // physical plan `CreateDataSourceTableCommand` doesn't take table type as parameter, but a
+ // boolean flag called `managedIfNoPath`. We set the table type to MANAGED here to simulate
+ // setting the `managedIfNoPath` flag. In the future we should refactor the physical plan and
+ // make it take `CatalogTable` directly.
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty.copy(properties = options),
+ schema = schema.getOrElse(new StructType),
+ provider = Some(provider),
+ partitionColumnNames = partitionColumnNames,
+ bucketSpec = bucketSpec
+ )
+
+ // Determine the storage mode.
+ val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+
if (ctx.query != null) {
// Get the backing query.
val query = plan(ctx.query)
@@ -333,32 +352,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
}
- // Determine the storage mode.
- val mode = if (ifNotExists) {
- SaveMode.Ignore
- } else {
- SaveMode.ErrorIfExists
- }
-
- CreateTableUsingAsSelect(
- table, provider, partitionColumnNames, bucketSpec, mode, options, query)
+ CreateTable(tableDesc, mode, Some(query))
} else {
- val struct = Option(ctx.colTypeList()).map(createStructType)
- if (struct.isEmpty && bucketSpec.nonEmpty) {
- throw new ParseException(
- "Expected explicit specification of table schema when using CLUSTERED BY clause.", ctx)
- }
+ if (temp) {
+ if (ifNotExists) {
+ operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
+ }
- CreateTableUsing(
- table,
- struct,
- provider,
- temp,
- options,
- partitionColumnNames,
- bucketSpec,
- ifNotExists,
- managedIfNoPath = true)
+ logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
+ "CREATE TEMPORARY VIEW ... USING ... instead")
+ CreateTempViewUsing(table, schema, replace = true, provider, options)
+ } else {
+ CreateTable(tableDesc, mode, None)
+ }
}
}
@@ -891,8 +897,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create a table, returning either a [[CreateTableCommand]] or a
- * [[CreateHiveTableAsSelectLogicalPlan]].
+ * Create a table, returning a [[CreateTable]] logical plan.
*
* This is not used to create datasource tables, which is handled through
* "CREATE TABLE ... USING ...".
@@ -933,23 +938,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)
- // Ensuring whether no duplicate name is used in table definition
- val colNames = dataCols.map(_.name)
- if (colNames.length != colNames.distinct.length) {
- val duplicateColumns = colNames.groupBy(identity).collect {
- case (x, ys) if ys.length > 1 => "\"" + x + "\""
- }
- operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
- duplicateColumns.mkString("[", ",", "]"), ctx)
- }
-
- // For Hive tables, partition columns must not be part of the schema
- val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
- if (badPartCols.nonEmpty) {
- operationNotAllowed(s"Partition columns may not be specified in the schema: " +
- badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
- }
-
// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
val schema = StructType(dataCols ++ partitionCols)
@@ -1001,10 +989,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
tableType = tableType,
storage = storage,
schema = schema,
+ provider = Some("hive"),
partitionColumnNames = partitionCols.map(_.name),
properties = properties,
comment = comment)
+ val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+
selectQuery match {
case Some(q) =>
// Just use whatever is projected in the select statement as our schema
@@ -1025,7 +1016,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
if (conf.convertCTAS && !hasStorageProperties) {
- val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
val optionsWithPath = if (location.isDefined) {
@@ -1033,19 +1023,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
} else {
Map.empty[String, String]
}
- CreateTableUsingAsSelect(
- tableIdent = tableDesc.identifier,
- provider = conf.defaultDataSourceName,
- partitionColumns = tableDesc.partitionColumnNames.toArray,
- bucketSpec = None,
- mode = mode,
- options = optionsWithPath,
- q
+
+ val newTableDesc = tableDesc.copy(
+ storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
+ provider = Some(conf.defaultDataSourceName)
)
+
+ CreateTable(newTableDesc, mode, Some(q))
} else {
- CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+ CreateTable(tableDesc, mode, Some(q))
}
- case None => CreateTableCommand(tableDesc, ifNotExists)
+ case None => CreateTable(tableDesc, mode, None)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 52e19819f2..fb08e1228e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Strategy}
+import org.apache.spark.sql.{execution, SaveMode, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case c: CreateTableUsing if c.temporary && !c.allowExisting =>
- logWarning(
- s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " +
- s"please use CREATE TEMPORARY VIEW viewName USING... instead")
- ExecutedCommandExec(
- CreateTempViewUsing(
- c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil
-
- case c: CreateTableUsing if !c.temporary =>
+ case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" =>
+ val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
+ ExecutedCommandExec(cmd) :: Nil
+
+ case CreateTable(tableDesc, mode, None) =>
val cmd =
CreateDataSourceTableCommand(
- c.tableIdent,
- c.userSpecifiedSchema,
- c.provider,
- c.options,
- c.partitionColumns,
- c.bucketSpec,
- c.allowExisting,
- c.managedIfNoPath)
+ tableDesc.identifier,
+ if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else None,
+ tableDesc.provider.get,
+ tableDesc.storage.properties,
+ tableDesc.partitionColumnNames.toArray,
+ tableDesc.bucketSpec,
+ ignoreIfExists = mode == SaveMode.Ignore,
+ managedIfNoPath = tableDesc.tableType == CatalogTableType.MANAGED)
ExecutedCommandExec(cmd) :: Nil
- case c: CreateTableUsing if c.temporary && c.allowExisting =>
- throw new AnalysisException(
- "allowExisting should be set to false when creating a temporary table.")
+ // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
+ // `CreateTables`
- case c: CreateTableUsingAsSelect =>
+ case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" =>
val cmd =
CreateDataSourceTableAsSelectCommand(
- c.tableIdent,
- c.provider,
- c.partitionColumns,
- c.bucketSpec,
- c.mode,
- c.options,
- c.child)
+ tableDesc.identifier,
+ tableDesc.provider.get,
+ tableDesc.partitionColumnNames.toArray,
+ tableDesc.bucketSpec,
+ mode,
+ tableDesc.storage.properties,
+ query)
ExecutedCommandExec(cmd) :: Nil
- case c: CreateTempViewUsing =>
- ExecutedCommandExec(c) :: Nil
+ case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil
+
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 93eb386ade..7b028e72ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.command
-import java.util.regex.Pattern
-
import scala.collection.mutable
import scala.util.control.NonFatal
@@ -59,21 +57,6 @@ case class CreateDataSourceTableCommand(
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- // Since we are saving metadata to metastore, we need to check if metastore supports
- // the table name and database name we have for this query. MetaStoreUtils.validateName
- // is the method used by Hive to check if a table name or a database name is valid for
- // the metastore.
- if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
- throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
- s"metastore. Metastore only accepts table name containing characters, numbers and _.")
- }
- if (tableIdent.database.isDefined &&
- !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
- throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
- s"for metastore. Metastore only accepts database name containing " +
- s"characters, numbers and _.")
- }
-
val tableName = tableIdent.unquotedString
val sessionState = sparkSession.sessionState
@@ -106,22 +89,12 @@ case class CreateDataSourceTableCommand(
val partitionColumns = if (userSpecifiedSchema.nonEmpty) {
userSpecifiedPartitionColumns
} else {
- val res = dataSource match {
+ // This is guaranteed in `PreprocessDDL`.
+ assert(userSpecifiedPartitionColumns.isEmpty)
+ dataSource match {
case r: HadoopFsRelation => r.partitionSchema.fieldNames
case _ => Array.empty[String]
}
- if (userSpecifiedPartitionColumns.length > 0) {
- // The table does not have a specified schema, which means that the schema will be inferred
- // when we load the table. So, we are not expecting partition columns and we will discover
- // partitions when we load the table. However, if there are specified partition columns,
- // we simply ignore them and provide a warning message.
- logWarning(
- s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " +
- s"ignored. The schema and partition columns of table $tableIdent are inferred. " +
- s"Schema: ${dataSource.schema.simpleString}; " +
- s"Partition columns: ${res.mkString("(", ", ", ")")}")
- }
- res
}
CreateDataSourceTableUtils.createDataSourceTable(
@@ -164,21 +137,6 @@ case class CreateDataSourceTableAsSelectCommand(
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
- // Since we are saving metadata to metastore, we need to check if metastore supports
- // the table name and database name we have for this query. MetaStoreUtils.validateName
- // is the method used by Hive to check if a table name or a database name is valid for
- // the metastore.
- if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
- throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
- s"metastore. Metastore only accepts table name containing characters, numbers and _.")
- }
- if (tableIdent.database.isDefined &&
- !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
- throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
- s"for metastore. Metastore only accepts database name containing " +
- s"characters, numbers and _.")
- }
-
val tableName = tableIdent.unquotedString
val sessionState = sparkSession.sessionState
var createMetastoreTable = false
@@ -311,20 +269,6 @@ object CreateDataSourceTableUtils extends Logging {
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
- /**
- * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
- * i.e. if this name only contains characters, numbers, and _.
- *
- * This method is intended to have the same behavior of
- * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
- */
- def validateName(name: String): Boolean = {
- val tpat = Pattern.compile("[\\w_]+")
- val matcher = tpat.matcher(name)
-
- matcher.matches()
- }
-
def createDataSourceTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
@@ -396,6 +340,7 @@ object CreateDataSourceTableUtils extends Logging {
identifier = tableIdent,
tableType = tableType,
schema = new StructType,
+ provider = Some(provider),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
@@ -425,6 +370,7 @@ object CreateDataSourceTableUtils extends Logging {
properties = options
),
schema = relation.schema,
+ provider = Some(provider),
properties = tableProperties.toMap,
viewText = None)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 18369b51b9..1b1e2123b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -19,50 +19,25 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.types._
+case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan])
+ extends LogicalPlan with Command {
+ assert(tableDesc.provider.isDefined, "The table to be created must have a provider.")
-/**
- * Used to represent the operation of create table using a data source.
- *
- * @param allowExisting If it is true, we will do nothing when the table already exists.
- * If it is false, an exception will be thrown
- */
-case class CreateTableUsing(
- tableIdent: TableIdentifier,
- userSpecifiedSchema: Option[StructType],
- provider: String,
- temporary: Boolean,
- options: Map[String, String],
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
- allowExisting: Boolean,
- managedIfNoPath: Boolean) extends LogicalPlan with logical.Command {
-
- override def output: Seq[Attribute] = Seq.empty
- override def children: Seq[LogicalPlan] = Seq.empty
-}
+ if (query.isEmpty) {
+ assert(
+ mode == SaveMode.ErrorIfExists || mode == SaveMode.Ignore,
+ "create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.")
+ }
-/**
- * A node used to support CTAS statements and saveAsTable for the data source API.
- * This node is a [[logical.UnaryNode]] instead of a [[logical.Command]] because we want the
- * analyzer can analyze the logical plan that will be used to populate the table.
- * So, [[PreWriteCheck]] can detect cases that are not allowed.
- */
-case class CreateTableUsingAsSelect(
- tableIdent: TableIdentifier,
- provider: String,
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
- mode: SaveMode,
- options: Map[String, String],
- child: LogicalPlan) extends logical.UnaryNode {
override def output: Seq[Attribute] = Seq.empty[Attribute]
+
+ override def children: Seq[LogicalPlan] = query.toSeq
}
case class CreateTempViewUsing(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 15b9d14bd7..d5b92323d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -17,17 +17,21 @@
package org.apache.spark.sql.execution.datasources
+import java.util.regex.Pattern
+
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
+import org.apache.spark.sql.types.{AtomicType, StructType}
/**
* Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
@@ -62,6 +66,130 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo
}
/**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ // When we CREATE TABLE without specifying the table schema, we should fail the query if
+ // bucketing information is specified, as we can't infer bucketing from data files currently,
+ // and we should ignore the partition columns if it's specified, as we will infer it later, at
+ // runtime.
+ case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+ if (tableDesc.bucketSpec.isDefined) {
+ failAnalysis("Cannot specify bucketing information if the table schema is not specified " +
+ "when creating and will be inferred at runtime")
+ }
+
+ val partitionColumnNames = tableDesc.partitionColumnNames
+ if (partitionColumnNames.nonEmpty) {
+ // The table does not have a specified schema, which means that the schema will be inferred
+ // at runtime. So, we are not expecting partition columns and we will discover partitions
+ // at runtime. However, if there are specified partition columns, we simply ignore them and
+ // provide a warning message.
+ logWarning(
+ s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " +
+ s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " +
+ "be inferred.")
+ c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+ } else {
+ c
+ }
+
+ // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
+ // config, and do various checks:
+ // * column names in table definition can't be duplicated.
+ // * partition, bucket and sort column names must exist in table definition.
+ // * partition, bucket and sort column names can't be duplicated.
+ // * can't use all table columns as partition columns.
+ // * partition columns' type must be AtomicType.
+ // * sort columns' type must be orderable.
+ case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
+ val schema = if (query.isDefined) query.get.schema else tableDesc.schema
+ checkDuplication(schema.map(_.name), "table definition of " + tableDesc.identifier)
+
+ val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
+ val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked)
+ c.copy(tableDesc = bucketColsChecked)
+ }
+
+ private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
+ val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName =>
+ normalizeColumnName(tableDesc.identifier, schema, colName, "partition")
+ }
+ checkDuplication(normalizedPartitionCols, "partition")
+
+ if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
+ if (tableDesc.provider.get == "hive") {
+ // When we hit this branch, it means users didn't specify schema for the table to be
+ // created, as we always include partition columns in table schema for hive serde tables.
+ // The real schema will be inferred at hive metastore by hive serde, plus the given
+ // partition columns, so we should not fail the analysis here.
+ } else {
+ failAnalysis("Cannot use all columns for partition columns")
+ }
+
+ }
+
+ schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
+ case _: AtomicType => // OK
+ case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column")
+ }
+
+ tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+ }
+
+ private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
+ tableDesc.bucketSpec match {
+ case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
+ val normalizedBucketCols = bucketColumnNames.map { colName =>
+ normalizeColumnName(tableDesc.identifier, schema, colName, "bucket")
+ }
+ checkDuplication(normalizedBucketCols, "bucket")
+
+ val normalizedSortCols = sortColumnNames.map { colName =>
+ normalizeColumnName(tableDesc.identifier, schema, colName, "sort")
+ }
+ checkDuplication(normalizedSortCols, "sort")
+
+ schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach {
+ case dt if RowOrdering.isOrderable(dt) => // OK
+ case other => failAnalysis(s"Cannot use ${other.simpleString} for sorting column")
+ }
+
+ tableDesc.copy(
+ bucketSpec = Some(BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols))
+ )
+
+ case None => tableDesc
+ }
+ }
+
+ private def checkDuplication(colNames: Seq[String], colType: String): Unit = {
+ if (colNames.distinct.length != colNames.length) {
+ val duplicateColumns = colNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => x
+ }
+ failAnalysis(s"Found duplicate column(s) in $colType: ${duplicateColumns.mkString(", ")}")
+ }
+ }
+
+ private def normalizeColumnName(
+ tableIdent: TableIdentifier,
+ schema: StructType,
+ colName: String,
+ colType: String): String = {
+ val tableCols = schema.map(_.name)
+ tableCols.find(conf.resolver(_, colName)).getOrElse {
+ failAnalysis(s"$colType column $colName is not defined in table $tableIdent, " +
+ s"defined table columns are: ${tableCols.mkString(", ")}")
+ }
+ }
+
+ private def failAnalysis(msg: String) = throw new AnalysisException(msg)
+}
+
+/**
* Preprocess the [[InsertIntoTable]] plan. Throws exception if the number of columns mismatch, or
* specified partition columns are different from the existing partition columns in the target
* table. It also does data type casting and field renaming, to make sure that the columns to be
@@ -152,8 +280,25 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
+ // This regex is used to check if the table name and database name is valid for `CreateTable`.
+ private val validNameFormat = Pattern.compile("[\\w_]+")
+
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
+ case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
+ // Since we are saving table metadata to metastore, we should make sure the table name
+ // and database name don't break some common restrictions, e.g. special chars except
+ // underscore are not allowed.
+ val tblIdent = tableDesc.identifier
+ if (!validNameFormat.matcher(tblIdent.table).matches()) {
+ failAnalysis(s"Table name ${tblIdent.table} is not a valid name for " +
+ s"metastore. Metastore only accepts table name containing characters, numbers and _.")
+ }
+ if (tblIdent.database.exists(db => !validNameFormat.matcher(db).matches())) {
+ failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " +
+ s"metastore. Metastore only accepts table name containing characters, numbers and _.")
+ }
+
case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation, _, _),
partition, query, overwrite, ifNotExists) =>
@@ -206,22 +351,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
- case c: CreateTableUsingAsSelect =>
+ case CreateTable(tableDesc, mode, Some(query)) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
- if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) {
+ if (mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) {
// Need to remove SubQuery operator.
- EliminateSubqueryAliases(catalog.lookupRelation(c.tableIdent)) match {
+ EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation, _, _) =>
// Get all input data source relations of the query.
- val srcRelations = c.child.collect {
+ val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
}
if (srcRelations.contains(dest)) {
failAnalysis(
- s"Cannot overwrite table ${c.tableIdent} that is also being read from.")
+ s"Cannot overwrite table ${tableDesc.identifier} that is also being read from.")
} else {
// OK
}
@@ -232,19 +377,6 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// OK
}
- PartitioningUtils.validatePartitionColumn(
- c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
-
- for {
- spec <- c.bucketSpec
- sortColumnName <- spec.sortColumnNames
- sortColumn <- c.child.schema.find(_.name == sortColumnName)
- } {
- if (!RowOrdering.isOrderable(sortColumn.dataType)) {
- failAnalysis(s"Cannot use ${sortColumn.dataType.simpleString} for sorting column.")
- }
- }
-
case _ => // OK
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f8f78723b9..1f87f0e73a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -21,13 +21,13 @@ import scala.collection.JavaConverters._
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.execution.datasources.CreateTableUsing
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.types.StructType
@@ -223,20 +223,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
tableName: String,
source: String,
options: Map[String, String]): DataFrame = {
- val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- val cmd =
- CreateTableUsing(
- tableIdent,
- userSpecifiedSchema = None,
- source,
- temporary = false,
- options = options,
- partitionColumns = Array.empty[String],
- bucketSpec = None,
- allowExisting = false,
- managedIfNoPath = false)
- sparkSession.sessionState.executePlan(cmd).toRdd
- sparkSession.table(tableIdent)
+ createExternalTable(tableName, source, new StructType, options)
}
/**
@@ -271,19 +258,20 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
+ if (source == "hive") {
+ throw new AnalysisException("Cannot create hive serde table with createExternalTable API.")
+ }
+
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- val cmd =
- CreateTableUsing(
- tableIdent,
- userSpecifiedSchema = Some(schema),
- source,
- temporary = false,
- options,
- partitionColumns = Array.empty[String],
- bucketSpec = None,
- allowExisting = false,
- managedIfNoPath = false)
- sparkSession.sessionState.executePlan(cmd).toRdd
+ val tableDesc = CatalogTable(
+ identifier = tableIdent,
+ tableType = CatalogTableType.EXTERNAL,
+ storage = CatalogStorageFormat.empty.copy(properties = options),
+ schema = schema,
+ provider = Some(source)
+ )
+ val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
+ sparkSession.sessionState.executePlan(plan).toRdd
sparkSession.table(tableIdent)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index a228566b6b..052bce0923 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.AnalyzeTableCommand
-import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreprocessTableInsertion, ResolveDataSource}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -111,6 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
lazy val analyzer: Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
+ PreprocessDDL(conf) ::
PreprocessTableInsertion(conf) ::
new FindDataSourceTable(sparkSession) ::
DataSourceAnalysis(conf) ::
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 999afc9751..044fa5fb9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.sql.execution.command
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource}
-import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
-import org.apache.spark.sql.execution.datasources.CreateTableUsing
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
@@ -243,12 +242,12 @@ class DDLCommandSuite extends PlanTest {
allSources.foreach { s =>
val query = s"CREATE TABLE my_tab STORED AS $s"
- val ct = parseAs[CreateTableCommand](query)
+ val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
- assert(ct.table.storage.serde == hiveSerde.get.serde)
- assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
- assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+ assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+ assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+ assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
}
}
@@ -259,14 +258,14 @@ class DDLCommandSuite extends PlanTest {
val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"
// No conflicting serdes here, OK
- val parsed1 = parseAs[CreateTableCommand](query1)
- assert(parsed1.table.storage.serde == Some("anything"))
- assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
- assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
- val parsed2 = parseAs[CreateTableCommand](query2)
- assert(parsed2.table.storage.serde.isEmpty)
- assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
- assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
+ val parsed1 = parseAs[CreateTable](query1)
+ assert(parsed1.tableDesc.storage.serde == Some("anything"))
+ assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt"))
+ assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt"))
+ val parsed2 = parseAs[CreateTable](query2)
+ assert(parsed2.tableDesc.storage.serde.isEmpty)
+ assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt"))
+ assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt"))
}
test("create table - row format serde and generic file format") {
@@ -276,12 +275,12 @@ class DDLCommandSuite extends PlanTest {
allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
if (supportedSources.contains(s)) {
- val ct = parseAs[CreateTableCommand](query)
+ val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
- assert(ct.table.storage.serde == Some("anything"))
- assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
- assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+ assert(ct.tableDesc.storage.serde == Some("anything"))
+ assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+ assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format serde", "incompatible", s))
}
@@ -295,12 +294,12 @@ class DDLCommandSuite extends PlanTest {
allSources.foreach { s =>
val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
if (supportedSources.contains(s)) {
- val ct = parseAs[CreateTableCommand](query)
+ val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
assert(hiveSerde.isDefined)
- assert(ct.table.storage.serde == hiveSerde.get.serde)
- assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
- assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+ assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+ assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+ assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
} else {
assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s))
}
@@ -312,9 +311,9 @@ class DDLCommandSuite extends PlanTest {
sql = "CREATE EXTERNAL TABLE my_tab",
containsThesePhrases = Seq("create external table", "location"))
val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
- val ct = parseAs[CreateTableCommand](query)
- assert(ct.table.tableType == CatalogTableType.EXTERNAL)
- assert(ct.table.storage.locationUri == Some("/something/anything"))
+ val ct = parseAs[CreateTable](query)
+ assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
+ assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
}
test("create table - property values must be set") {
@@ -329,47 +328,29 @@ class DDLCommandSuite extends PlanTest {
test("create table - location implies external") {
val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
- val ct = parseAs[CreateTableCommand](query)
- assert(ct.table.tableType == CatalogTableType.EXTERNAL)
- assert(ct.table.storage.locationUri == Some("/something/anything"))
- }
-
- test("create table - column repeated in partitioning columns") {
- val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)"
- val e = intercept[ParseException] { parser.parsePlan(query) }
- assert(e.getMessage.contains(
- "Operation not allowed: Partition columns may not be specified in the schema: [\"key\"]"))
- }
-
- test("create table - duplicate column names in the table definition") {
- val query = "CREATE TABLE default.tab1 (key INT, key STRING)"
- val e = intercept[ParseException] { parser.parsePlan(query) }
- assert(e.getMessage.contains("Operation not allowed: Duplicated column names found in " +
- "table definition of `default`.`tab1`: [\"key\"]"))
+ val ct = parseAs[CreateTable](query)
+ assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
+ assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
}
test("create table using - with partitioned by") {
val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
"USING parquet PARTITIONED BY (a)"
- val expected = CreateTableUsing(
- TableIdentifier("my_tab"),
- Some(new StructType()
+
+ val expectedTableDesc = CatalogTable(
+ identifier = TableIdentifier("my_tab"),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType()
.add("a", IntegerType, nullable = true, "test")
- .add("b", StringType)),
- "parquet",
- false,
- Map.empty,
- null,
- None,
- false,
- true)
+ .add("b", StringType),
+ provider = Some("parquet"),
+ partitionColumnNames = Seq("a")
+ )
parser.parsePlan(query) match {
- case ct: CreateTableUsing =>
- // We can't compare array in `CreateTableUsing` directly, so here we compare
- // `partitionColumns` ahead, and make `partitionColumns` null before plan comparison.
- assert(Seq("a") == ct.partitionColumns.toSeq)
- comparePlans(ct.copy(partitionColumns = null), expected)
+ case CreateTable(tableDesc, _, None) =>
+ assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
@@ -379,23 +360,19 @@ class DDLCommandSuite extends PlanTest {
test("create table using - with bucket") {
val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
- val expected = CreateTableUsing(
- TableIdentifier("my_tab"),
- Some(new StructType().add("a", IntegerType).add("b", StringType)),
- "parquet",
- false,
- Map.empty,
- null,
- Some(BucketSpec(5, Seq("a"), Seq("b"))),
- false,
- true)
+
+ val expectedTableDesc = CatalogTable(
+ identifier = TableIdentifier("my_tab"),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("a", IntegerType).add("b", StringType),
+ provider = Some("parquet"),
+ bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b")))
+ )
parser.parsePlan(query) match {
- case ct: CreateTableUsing =>
- // `Array.empty == Array.empty` returns false, here we set `partitionColumns` to null before
- // plan comparison.
- assert(ct.partitionColumns.isEmpty)
- comparePlans(ct.copy(partitionColumns = null), expected)
+ case CreateTable(tableDesc, _, None) =>
+ assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
@@ -907,22 +884,20 @@ class DDLCommandSuite extends PlanTest {
|CREATE TABLE table_name USING json
|OPTIONS (a 1, b 0.1, c TRUE)
""".stripMargin
- val expected = CreateTableUsing(
- TableIdentifier("table_name"),
- None,
- "json",
- false,
- Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
- null,
- None,
- false,
- true)
+
+ val expectedTableDesc = CatalogTable(
+ identifier = TableIdentifier("table_name"),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty.copy(
+ properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true")
+ ),
+ schema = new StructType,
+ provider = Some("json")
+ )
parser.parsePlan(sql) match {
- case ct: CreateTableUsing =>
- // We can't compare array in `CreateTableUsing` directly, so here we explicitly
- // set partitionColumns to `null` and then compare it.
- comparePlans(ct.copy(partitionColumns = null), expected)
+ case CreateTable(tableDesc, _, None) =>
+ assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 564fc73ee7..ca9b210125 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -94,6 +93,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
+ provider = Some("parquet"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L)
}
@@ -359,6 +359,43 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
+ test("create table - duplicate column names in the table definition") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int, a string) USING json")
+ }
+ assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
+ }
+
+ test("create table - partition column names not in table definition") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int, b string) USING json PARTITIONED BY (c)")
+ }
+ assert(e.message == "partition column c is not defined in table `tbl`, " +
+ "defined table columns are: a, b")
+ }
+
+ test("create table - bucket column names not in table definition") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int, b string) USING json CLUSTERED BY (c) INTO 4 BUCKETS")
+ }
+ assert(e.message == "bucket column c is not defined in table `tbl`, " +
+ "defined table columns are: a, b")
+ }
+
+ test("create table - column repeated in partition columns") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int) USING json PARTITIONED BY (a, a)")
+ }
+ assert(e.message == "Found duplicate column(s) in partition: a")
+ }
+
+ test("create table - column repeated in bucket columns") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int) USING json CLUSTERED BY (a, a) INTO 4 BUCKETS")
+ }
+ assert(e.message == "Found duplicate column(s) in bucket: a")
+ }
+
test("Describe Table with Corrupted Schema") {
import testImplicits._
@@ -1469,7 +1506,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
withTable("jsonTable") {
(("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
- val e = intercept[ParseException] {
+ val e = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE jsonTable
@@ -1479,9 +1516,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|)
|CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS
""".stripMargin)
- }.getMessage
- assert(e.contains(
- "Expected explicit specification of table schema when using CLUSTERED BY clause"))
+ }
+ assert(e.message == "Cannot specify bucketing information if the table schema is not " +
+ "specified when creating and will be inferred at runtime")
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index db970785a7..c7c1acda25 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -23,15 +23,13 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -436,23 +434,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
- case p: LogicalPlan if p.resolved => p
- case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) =>
- val desc = if (table.storage.serde.isEmpty) {
+ case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
+ val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
// add default serde
- table.withNewStorage(
+ tableDesc.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
- table
+ tableDesc
}
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc)
+
+ // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
+ // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
+ // tables yet.
+ if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+ throw new AnalysisException("" +
+ "CTAS for hive serde tables does not support append or overwrite semantics.")
+ }
execution.CreateHiveTableAsSelectCommand(
- desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
- child,
- allowExisting)
+ newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
+ query,
+ mode == SaveMode.Ignore)
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 8773993d36..e01c053ab5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -65,6 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
catalog.ParquetConversions ::
catalog.OrcConversions ::
catalog.CreateTables ::
+ PreprocessDDL(conf) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index e0c07db3b0..69a6884c7a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.types.StructType
@@ -36,8 +37,7 @@ class HiveDDLCommandSuite extends PlanTest {
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
- case c: CreateTableCommand => (c.table, c.ifNotExists)
- case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
+ case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore)
}.head
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d15e11a7ff..e078b58542 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -141,6 +141,13 @@ class HiveDDLSuite
}
}
+ test("create table: partition column names exist in table definition") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")
+ }
+ assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
+ }
+
test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>