aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-05-11 20:44:04 -0700
committerYin Huai <yhuai@databricks.com>2016-05-11 20:44:04 -0700
commitf036dd7ce727b40877337da66d687214786c4f14 (patch)
tree043e375be809ac10a01340b18003d64bebdecaca
parentff92eb2e80f2f38d10ac524ced82bb3f94b5b2bf (diff)
downloadspark-f036dd7ce727b40877337da66d687214786c4f14.tar.gz
spark-f036dd7ce727b40877337da66d687214786c4f14.tar.bz2
spark-f036dd7ce727b40877337da66d687214786c4f14.zip
[SPARK-14346] SHOW CREATE TABLE for data source tables
## What changes were proposed in this pull request? This PR adds native `SHOW CREATE TABLE` DDL command for data source tables. Support for Hive tables will be added in follow-up PR(s). To show table creation DDL for data source tables created by CTAS statements, this PR also added partitioning and bucketing support for normal `CREATE TABLE ... USING ...` syntax. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) A new test suite `ShowCreateTableSuite` is added in sql/hive package to test the new feature. Author: Cheng Lian <lian@databricks.com> Closes #12781 from liancheng/spark-14346-show-create-table.
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g45
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala107
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala243
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala169
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala1
14 files changed, 458 insertions, 127 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index ffb7a097ee..06ac37b7f8 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -45,7 +45,9 @@ statement
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider
- (OPTIONS tablePropertyList)? #createTableUsing
+ (OPTIONS tablePropertyList)?
+ (PARTITIONED BY partitionColumnNames=identifierList)?
+ bucketSpec? #createTableUsing
| createTableHeader tableProvider
(OPTIONS tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
@@ -102,6 +104,7 @@ statement
((FROM | IN) db=identifier)? #showColumns
| SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions
| SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions
+ | SHOW CREATE TABLE tableIdentifier #showCreateTable
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
tableIdentifier partitionSpec? describeColName? #describeTable
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 7d0584511f..d7b48ceca5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -44,7 +44,7 @@ sealed trait IdentifierWithDatabase {
/**
* Identifies a table in a database.
* If `database` is not defined, the current database is used.
- * When we register a permenent function in the FunctionRegistry, we use
+ * When we register a permanent function in the FunctionRegistry, we use
* unquotedString as the function name.
*/
case class TableIdentifier(table: String, database: Option[String])
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 53aba1f206..b6e074bf59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -182,6 +182,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Creates a [[ShowCreateTableCommand]]
+ */
+ override def visitShowCreateTable(ctx: ShowCreateTableContext): LogicalPlan = withOrigin(ctx) {
+ val table = visitTableIdentifier(ctx.tableIdentifier())
+ ShowCreateTableCommand(table)
+ }
+
+ /**
* Create a [[RefreshTable]] logical plan.
*/
override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) {
@@ -287,6 +295,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
+ val partitionColumnNames =
+ Option(ctx.partitionColumnNames)
+ .map(visitIdentifierList(_).toArray)
+ .getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
if (ctx.query != null) {
@@ -302,16 +314,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
SaveMode.ErrorIfExists
}
- val partitionColumnNames =
- Option(ctx.partitionColumnNames)
- .map(visitIdentifierList(_).toArray)
- .getOrElse(Array.empty[String])
-
CreateTableUsingAsSelect(
table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
} else {
val struct = Option(ctx.colTypeList()).map(createStructType)
- CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = true)
+ CreateTableUsing(
+ table,
+ struct,
+ provider,
+ temp,
+ options,
+ partitionColumnNames,
+ bucketSpec,
+ ifNotExists,
+ managedIfNoPath = true)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9747e58f43..faf359f548 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -372,10 +372,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableUsing(tableIdent, userSpecifiedSchema, provider, true, opts, false, _) =>
+ case c: CreateTableUsing if c.temporary && !c.allowExisting =>
ExecutedCommandExec(
CreateTempTableUsing(
- tableIdent, userSpecifiedSchema, provider, opts)) :: Nil
+ c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil
case c: CreateTableUsing if !c.temporary =>
val cmd =
@@ -384,6 +384,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
c.userSpecifiedSchema,
c.provider,
c.options,
+ c.partitionColumns,
+ c.bucketSpec,
c.allowExisting,
c.managedIfNoPath)
ExecutedCommandExec(cmd) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 576e12a94b..d5aaccc4bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -17,19 +17,14 @@
package org.apache.spark.sql.execution.command
-import java.io.File
-
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.types._
@@ -117,101 +112,3 @@ case class ExplainCommand(
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
}
-
-/**
- * A command to list the column names for a table. This function creates a
- * [[ShowColumnsCommand]] logical plan.
- *
- * The syntax of using this command in SQL is:
- * {{{
- * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
- * }}}
- */
-case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
- // The result of SHOW COLUMNS has one column called 'result'
- override val output: Seq[Attribute] = {
- AttributeReference("result", StringType, nullable = false)() :: Nil
- }
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
- Row(c.name)
- }
- }
-}
-
-/**
- * A command to list the partition names of a table. If the partition spec is specified,
- * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under
- * the following conditions:
- *
- * 1. If the command is called for a non partitioned table.
- * 2. If the partition spec refers to the columns that are not defined as partitioning columns.
- *
- * This function creates a [[ShowPartitionsCommand]] logical plan
- *
- * The syntax of using this command in SQL is:
- * {{{
- * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
- * }}}
- */
-case class ShowPartitionsCommand(
- table: TableIdentifier,
- spec: Option[TablePartitionSpec]) extends RunnableCommand {
- // The result of SHOW PARTITIONS has one column called 'result'
- override val output: Seq[Attribute] = {
- AttributeReference("result", StringType, nullable = false)() :: Nil
- }
-
- private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
- partColNames.map { name =>
- PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
- }.mkString(File.separator)
- }
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val catalog = sparkSession.sessionState.catalog
- val db = table.database.getOrElse(catalog.getCurrentDatabase)
- if (catalog.isTemporaryTable(table)) {
- throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " +
- s"${table.unquotedString}")
- } else {
- val tab = catalog.getTableMetadata(table)
- /**
- * Validate and throws an [[AnalysisException]] exception under the following conditions:
- * 1. If the table is not partitioned.
- * 2. If it is a datasource table.
- * 3. If it is a view or index table.
- */
- if (tab.tableType == CatalogTableType.VIEW ||
- tab.tableType == CatalogTableType.INDEX) {
- throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " +
- s"${tab.qualifiedName}")
- }
- if (!DDLUtils.isTablePartitioned(tab)) {
- throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " +
- s"partitioned: ${tab.qualifiedName}")
- }
- if (DDLUtils.isDatasourceTable(tab)) {
- throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " +
- s"${tab.qualifiedName}")
- }
- /**
- * Validate the partitioning spec by making sure all the referenced columns are
- * defined as partitioning columns in table definition. An AnalysisException exception is
- * thrown if the partitioning spec is invalid.
- */
- if (spec.isDefined) {
- val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
- if (badColumns.nonEmpty) {
- throw new AnalysisException(
- s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " +
- s"specified for SHOW PARTITIONS")
- }
- }
- val partNames =
- catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames))
- partNames.map { p => Row(p) }
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 3525111e46..de3c868176 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -51,6 +51,8 @@ case class CreateDataSourceTableCommand(
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
+ partitionColumns: Array[String],
+ bucketSpec: Option[BucketSpec],
ignoreIfExists: Boolean,
managedIfNoPath: Boolean)
extends RunnableCommand {
@@ -103,8 +105,8 @@ case class CreateDataSourceTableCommand(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = userSpecifiedSchema,
- partitionColumns = Array.empty[String],
- bucketSpec = None,
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
provider = provider,
options = optionsWithPath,
isExternal = isExternal)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 0f90715a90..e6dcd1ee95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,10 +26,13 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
-import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, StructType}
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
case class CreateTableAsSelectLogicalPlan(
@@ -490,3 +493,241 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio
}
}
}
+
+/**
+ * A command to list the column names for a table. This function creates a
+ * [[ShowColumnsCommand]] logical plan.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
+ * }}}
+ */
+case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
+ // The result of SHOW COLUMNS has one column called 'result'
+ override val output: Seq[Attribute] = {
+ AttributeReference("result", StringType, nullable = false)() :: Nil
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
+ Row(c.name)
+ }
+ }
+}
+
+/**
+ * A command to list the partition names of a table. If the partition spec is specified,
+ * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under
+ * the following conditions:
+ *
+ * 1. If the command is called for a non partitioned table.
+ * 2. If the partition spec refers to the columns that are not defined as partitioning columns.
+ *
+ * This function creates a [[ShowPartitionsCommand]] logical plan
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
+ * }}}
+ */
+case class ShowPartitionsCommand(
+ table: TableIdentifier,
+ spec: Option[TablePartitionSpec]) extends RunnableCommand {
+ // The result of SHOW PARTITIONS has one column called 'result'
+ override val output: Seq[Attribute] = {
+ AttributeReference("result", StringType, nullable = false)() :: Nil
+ }
+
+ private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
+ partColNames.map { name =>
+ PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
+ }.mkString(File.separator)
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+
+ if (catalog.isTemporaryTable(table)) {
+ throw new AnalysisException(
+ s"SHOW PARTITIONS is not allowed on a temporary table: ${table.unquotedString}")
+ }
+
+ val tab = catalog.getTableMetadata(table)
+
+ /**
+ * Validate and throws an [[AnalysisException]] exception under the following conditions:
+ * 1. If the table is not partitioned.
+ * 2. If it is a datasource table.
+ * 3. If it is a view or index table.
+ */
+ if (tab.tableType == VIEW ||
+ tab.tableType == INDEX) {
+ throw new AnalysisException(
+ s"SHOW PARTITIONS is not allowed on a view or index table: ${tab.qualifiedName}")
+ }
+
+ if (!DDLUtils.isTablePartitioned(tab)) {
+ throw new AnalysisException(
+ s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}")
+ }
+
+ if (DDLUtils.isDatasourceTable(tab)) {
+ throw new AnalysisException(
+ s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}")
+ }
+
+ /**
+ * Validate the partitioning spec by making sure all the referenced columns are
+ * defined as partitioning columns in table definition. An AnalysisException exception is
+ * thrown if the partitioning spec is invalid.
+ */
+ if (spec.isDefined) {
+ val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
+ if (badColumns.nonEmpty) {
+ val badCols = badColumns.mkString("[", ", ", "]")
+ throw new AnalysisException(
+ s"Non-partitioning column(s) $badCols are specified for SHOW PARTITIONS")
+ }
+ }
+
+ val partNames = catalog.listPartitions(table, spec).map { p =>
+ getPartName(p.spec, tab.partitionColumnNames)
+ }
+
+ partNames.map(Row(_))
+ }
+}
+
+case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableCommand {
+ override val output: Seq[Attribute] = Seq(
+ AttributeReference("createtab_stmt", StringType, nullable = false)()
+ )
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+
+ if (catalog.isTemporaryTable(table)) {
+ throw new AnalysisException(
+ s"SHOW CREATE TABLE cannot be applied to temporary table")
+ }
+
+ if (!catalog.tableExists(table)) {
+ throw new AnalysisException(s"Table $table doesn't exist")
+ }
+
+ val tableMetadata = catalog.getTableMetadata(table)
+
+ val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
+ showCreateDataSourceTable(tableMetadata)
+ } else {
+ throw new UnsupportedOperationException(
+ "SHOW CREATE TABLE only supports Spark SQL data source tables.")
+ }
+
+ Seq(Row(stmt))
+ }
+
+ private def showCreateDataSourceTable(metadata: CatalogTable): String = {
+ val builder = StringBuilder.newBuilder
+
+ builder ++= s"CREATE TABLE ${table.quotedString} "
+ showDataSourceTableDataCols(metadata, builder)
+ showDataSourceTableOptions(metadata, builder)
+ showDataSourceTableNonDataColumns(metadata, builder)
+
+ builder.toString()
+ }
+
+ private def showDataSourceTableDataCols(metadata: CatalogTable, builder: StringBuilder): Unit = {
+ val props = metadata.properties
+ val schemaParts = for {
+ numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
+ index <- 0 until numParts.toInt
+ } yield props.getOrElse(
+ s"spark.sql.sources.schema.part.$index",
+ throw new AnalysisException(
+ s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing."
+ )
+ )
+
+ if (schemaParts.nonEmpty) {
+ val fields = DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields
+ val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
+ builder ++= colTypeList.mkString("(", ", ", ")")
+ }
+
+ builder ++= "\n"
+ }
+
+ private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
+ val props = metadata.properties
+
+ builder ++= s"USING ${props("spark.sql.sources.provider")}\n"
+
+ val dataSourceOptions = metadata.storage.serdeProperties.filterNot {
+ case (key, value) =>
+ // If it's a managed table, omit PATH option. Spark SQL always creates external table
+ // when the table creation DDL contains the PATH option.
+ key.toLowerCase == "path" && metadata.tableType == MANAGED
+ }.map {
+ case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
+ }
+
+ if (dataSourceOptions.nonEmpty) {
+ builder ++= "OPTIONS (\n"
+ builder ++= dataSourceOptions.mkString(" ", ",\n ", "\n")
+ builder ++= ")\n"
+ }
+ }
+
+ private def showDataSourceTableNonDataColumns(
+ metadata: CatalogTable, builder: StringBuilder): Unit = {
+ val props = metadata.properties
+
+ def getColumnNamesByType(colType: String, typeName: String): Seq[String] = {
+ (for {
+ numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
+ index <- 0 until numCols.toInt
+ } yield props.getOrElse(
+ s"spark.sql.sources.schema.${colType}Col.$index",
+ throw new AnalysisException(
+ s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
+ )
+ )).map(quoteIdentifier)
+ }
+
+ val partCols = getColumnNamesByType("part", "partitioning columns")
+ if (partCols.nonEmpty) {
+ builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
+ }
+
+ val bucketCols = getColumnNamesByType("bucket", "bucketing columns")
+ if (bucketCols.nonEmpty) {
+ builder ++= s"CLUSTERED BY ${bucketCols.mkString("(", ", ", ")")}\n"
+
+ val sortCols = getColumnNamesByType("sort", "sorting columns")
+ if (sortCols.nonEmpty) {
+ builder ++= s"SORTED BY ${sortCols.mkString("(", ", ", ")")}\n"
+ }
+
+ val numBuckets = props.getOrElse(
+ "spark.sql.sources.schema.numBuckets",
+ throw new AnalysisException("Corrupted bucket spec in catalog: missing bucket number")
+ )
+
+ builder ++= s"INTO $numBuckets BUCKETS\n"
+ }
+ }
+
+ private def escapeSingleQuotedString(str: String): String = {
+ val builder = StringBuilder.newBuilder
+
+ str.foreach {
+ case '\'' => builder ++= s"\\\'"
+ case ch => builder += ch
+ }
+
+ builder.toString()
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 7d0a3d9756..3863be5768 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -38,6 +38,8 @@ case class CreateTableUsing(
provider: String,
temporary: Boolean,
options: Map[String, String],
+ partitionColumns: Array[String],
+ bucketSpec: Option[BucketSpec],
allowExisting: Boolean,
managedIfNoPath: Boolean) extends LogicalPlan with logical.Command {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index d08dca32c0..fdfb188b38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -225,7 +225,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
userSpecifiedSchema = None,
source,
temporary = false,
- options,
+ options = options,
+ partitionColumns = Array.empty[String],
+ bucketSpec = None,
allowExisting = false,
managedIfNoPath = false)
sparkSession.executePlan(cmd).toRdd
@@ -272,6 +274,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
source,
temporary = false,
options,
+ partitionColumns = Array.empty[String],
+ bucketSpec = None,
allowExisting = false,
managedIfNoPath = false)
sparkSession.executePlan(cmd).toRdd
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index aeb613acb5..13df4493e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.internal.SQLConf
-
// TODO: merge this with DDLSuite (SPARK-14441)
class DDLCommandSuite extends PlanTest {
private val parser = new SparkSqlParser(new SQLConf)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b0a3a803d2..8cfcec79cd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewCommand}
+import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
@@ -44,7 +44,6 @@ import org.apache.spark.sql.types._
* cleaned up to integrate more nicely with [[HiveExternalCatalog]].
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
- private val conf = sparkSession.conf
private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive
@@ -110,7 +109,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
// We only need names at here since userSpecifiedSchema we loaded from the metastore
- // contains partition columns. We can always get datatypes of partitioning columns
+ // contains partition columns. We can always get data types of partitioning columns
// from userSpecifiedSchema.
val partitionColumns = getColumnNames("part")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 6d418c1dcf..2f6aa36f95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -25,10 +25,8 @@ import scala.collection.mutable
import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
-import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.{SparkConf, SparkContext}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
new file mode 100644
index 0000000000..12a1ad8987
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+ import testImplicits._
+
+ test("data source table with user specified schema") {
+ withTable("ddl_test1") {
+ val jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
+
+ sql(
+ s"""CREATE TABLE ddl_test1 (
+ | a STRING,
+ | b STRING,
+ | `extra col` ARRAY<INT>,
+ | `<another>` STRUCT<x: INT, y: ARRAY<BOOLEAN>>
+ |)
+ |USING json
+ |OPTIONS (
+ | PATH '$jsonFilePath'
+ |)
+ """.stripMargin
+ )
+
+ checkCreateTable("ddl_test1")
+ }
+ }
+
+ test("data source table CTAS") {
+ withTable("ddl_test2") {
+ sql(
+ s"""CREATE TABLE ddl_test2
+ |USING json
+ |AS SELECT 1 AS a, "foo" AS b
+ """.stripMargin
+ )
+
+ checkCreateTable("ddl_test2")
+ }
+ }
+
+ test("partitioned data source table") {
+ withTable("ddl_test3") {
+ sql(
+ s"""CREATE TABLE ddl_test3
+ |USING json
+ |PARTITIONED BY (b)
+ |AS SELECT 1 AS a, "foo" AS b
+ """.stripMargin
+ )
+
+ checkCreateTable("ddl_test3")
+ }
+ }
+
+ test("bucketed data source table") {
+ withTable("ddl_test3") {
+ sql(
+ s"""CREATE TABLE ddl_test3
+ |USING json
+ |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
+ |AS SELECT 1 AS a, "foo" AS b
+ """.stripMargin
+ )
+
+ checkCreateTable("ddl_test3")
+ }
+ }
+
+ test("partitioned bucketed data source table") {
+ withTable("ddl_test4") {
+ sql(
+ s"""CREATE TABLE ddl_test4
+ |USING json
+ |PARTITIONED BY (c)
+ |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
+ |AS SELECT 1 AS a, "foo" AS b, 2.5 AS c
+ """.stripMargin
+ )
+
+ checkCreateTable("ddl_test4")
+ }
+ }
+
+ test("data source table using Dataset API") {
+ withTable("ddl_test5") {
+ spark
+ .range(3)
+ .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd, 'id as 'e)
+ .write
+ .mode("overwrite")
+ .partitionBy("a", "b")
+ .bucketBy(2, "c", "d")
+ .saveAsTable("ddl_test5")
+
+ checkCreateTable(TableIdentifier("ddl_test5", Some("default")))
+ }
+ }
+
+ private def checkCreateTable(table: String): Unit = {
+ checkCreateTable(TableIdentifier(table, Some("default")))
+ }
+
+ private def checkCreateTable(table: TableIdentifier): Unit = {
+ val db = table.database.getOrElse("default")
+ val expected = spark.externalCatalog.getTable(db, table.table)
+ val shownDDL = sql(s"SHOW CREATE TABLE ${table.quotedString}").head().getString(0)
+ sql(s"DROP TABLE ${table.quotedString}")
+
+ withTable(table.table) {
+ sql(shownDDL)
+ val actual = spark.externalCatalog.getTable(db, table.table)
+ checkCatalogTables(expected, actual)
+ }
+ }
+
+ private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
+ def normalize(table: CatalogTable): CatalogTable = {
+ val nondeterministicProps = Set(
+ "CreateTime",
+ "transient_lastDdlTime",
+ "grantTime",
+ "lastUpdateTime",
+ "last_modified_by",
+ "last_modified_time",
+ "Owner:",
+ "COLUMN_STATS_ACCURATE",
+ // The following are hive specific schema parameters which we do not need to match exactly.
+ "numFiles",
+ "numRows",
+ "rawDataSize",
+ "totalSize",
+ "totalNumberFiles",
+ "maxFileSize",
+ "minFileSize"
+ )
+
+ table.copy(
+ createTime = 0L,
+ lastAccessTime = 0L,
+ properties = table.properties.filterKeys(!nondeterministicProps.contains(_)))
+ }
+
+ assert(normalize(expected) == normalize(actual))
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index bbb775ef77..19f8cb3877 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1166,7 +1166,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
test("some show commands are not supported") {
- assertUnsupportedFeature { sql("SHOW CREATE TABLE my_table") }
assertUnsupportedFeature { sql("SHOW COMPACTIONS") }
assertUnsupportedFeature { sql("SHOW TRANSACTIONS") }
assertUnsupportedFeature { sql("SHOW INDEXES ON my_table") }