aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala184
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala185
4 files changed, 333 insertions, 58 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index d21565526e..d4f5cbb625 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -79,6 +79,12 @@ case class CatalogTablePartition(
*
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
+ *
+ * @param hasUnsupportedFeatures is used to indicate whether all table metadata entries retrieved
+ * from the concrete underlying external catalog (e.g. Hive metastore) are supported by
+ * Spark SQL. For example, if the underlying Hive table has skewed columns, this information
+ * can't be mapped to [[CatalogTable]] since Spark SQL doesn't handle skewed columns for now.
+ * In this case `hasUnsupportedFeatures` is set to true. By default, it is false.
*/
case class CatalogTable(
identifier: TableIdentifier,
@@ -95,7 +101,8 @@ case class CatalogTable(
properties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
- comment: Option[String] = None) {
+ comment: Option[String] = None,
+ hasUnsupportedFeatures: Boolean = false) {
// Verify that the provided columns are part of the schema
private val colNames = schema.map(_.name).toSet
@@ -200,6 +207,7 @@ case class SimpleCatalogRelation(
}
}
- require(metadata.identifier.database == Some(databaseName),
+ require(
+ metadata.identifier.database.contains(databaseName),
"provided database does not match the one specified in the table definition")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index bb4f1ff4f7..1fc02d1d4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -626,40 +626,149 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
showCreateDataSourceTable(tableMetadata)
} else {
- throw new UnsupportedOperationException(
- "SHOW CREATE TABLE only supports Spark SQL data source tables.")
+ showCreateHiveTable(tableMetadata)
}
Seq(Row(stmt))
}
+ private def showCreateHiveTable(metadata: CatalogTable): String = {
+ def reportUnsupportedError(): Unit = {
+ throw new UnsupportedOperationException(
+ s"Failed to execute SHOW CREATE TABLE against table ${metadata.identifier.quotedString}, " +
+ "because it contains table structure(s) (e.g. skewed columns) that Spark SQL doesn't " +
+ "support yet."
+ )
+ }
+
+ if (metadata.hasUnsupportedFeatures) {
+ reportUnsupportedError()
+ }
+
+ val builder = StringBuilder.newBuilder
+
+ val tableTypeString = metadata.tableType match {
+ case EXTERNAL => " EXTERNAL TABLE"
+ case VIEW => " VIEW"
+ case MANAGED => " TABLE"
+ case INDEX => reportUnsupportedError()
+ }
+
+ builder ++= s"CREATE$tableTypeString ${table.quotedString}"
+
+ if (metadata.tableType == VIEW) {
+ if (metadata.schema.nonEmpty) {
+ builder ++= metadata.schema.map(_.name).mkString("(", ", ", ")")
+ }
+ builder ++= metadata.viewText.mkString(" AS\n", "", "\n")
+ } else {
+ showHiveTableHeader(metadata, builder)
+ showHiveTableNonDataColumns(metadata, builder)
+ showHiveTableStorageInfo(metadata, builder)
+ showHiveTableProperties(metadata, builder)
+ }
+
+ builder.toString()
+ }
+
+ private def showHiveTableHeader(metadata: CatalogTable, builder: StringBuilder): Unit = {
+ val columns = metadata.schema.filterNot { column =>
+ metadata.partitionColumnNames.contains(column.name)
+ }.map(columnToDDLFragment)
+
+ if (columns.nonEmpty) {
+ builder ++= columns.mkString("(", ", ", ")\n")
+ }
+
+ metadata
+ .comment
+ .map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
+ .foreach(builder.append)
+ }
+
+ private def columnToDDLFragment(column: CatalogColumn): String = {
+ val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
+ s"${quoteIdentifier(column.name)} ${column.dataType}${comment.getOrElse("")}"
+ }
+
+ private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = {
+ if (metadata.partitionColumns.nonEmpty) {
+ val partCols = metadata.partitionColumns.map(columnToDDLFragment)
+ builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
+ }
+
+ if (metadata.bucketColumnNames.nonEmpty) {
+ throw new UnsupportedOperationException(
+ "Creating Hive table with bucket spec is not supported yet.")
+ }
+ }
+
+ private def showHiveTableStorageInfo(metadata: CatalogTable, builder: StringBuilder): Unit = {
+ val storage = metadata.storage
+
+ storage.serde.foreach { serde =>
+ builder ++= s"ROW FORMAT SERDE '$serde'\n"
+
+ val serdeProps = metadata.storage.serdeProperties.map {
+ case (key, value) =>
+ s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
+ }
+
+ builder ++= serdeProps.mkString("WITH SERDEPROPERTIES (", ",\n ", "\n)\n")
+ }
+
+ if (storage.inputFormat.isDefined || storage.outputFormat.isDefined) {
+ builder ++= "STORED AS\n"
+
+ storage.inputFormat.foreach { format =>
+ builder ++= s" INPUTFORMAT '${escapeSingleQuotedString(format)}'\n"
+ }
+
+ storage.outputFormat.foreach { format =>
+ builder ++= s" OUTPUTFORMAT '${escapeSingleQuotedString(format)}'\n"
+ }
+ }
+
+ if (metadata.tableType == EXTERNAL) {
+ storage.locationUri.foreach { uri =>
+ builder ++= s"LOCATION '$uri'\n"
+ }
+ }
+ }
+
+ private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
+ if (metadata.properties.nonEmpty) {
+ val filteredProps = metadata.properties.filterNot {
+ // Skips "EXTERNAL" property for external tables
+ case (key, _) => key == "EXTERNAL" && metadata.tableType == EXTERNAL
+ }
+
+ val props = filteredProps.map { case (key, value) =>
+ s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
+ }
+
+ if (props.nonEmpty) {
+ builder ++= props.mkString("TBLPROPERTIES (", ",\n ", ")\n")
+ }
+ }
+ }
+
private def showCreateDataSourceTable(metadata: CatalogTable): String = {
val builder = StringBuilder.newBuilder
builder ++= s"CREATE TABLE ${table.quotedString} "
- showDataSourceTableDataCols(metadata, builder)
+ showDataSourceTableDataColumns(metadata, builder)
showDataSourceTableOptions(metadata, builder)
showDataSourceTableNonDataColumns(metadata, builder)
builder.toString()
}
- private def showDataSourceTableDataCols(metadata: CatalogTable, builder: StringBuilder): Unit = {
- val props = metadata.properties
- val schemaParts = for {
- numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
- index <- 0 until numParts.toInt
- } yield props.getOrElse(
- s"spark.sql.sources.schema.part.$index",
- throw new AnalysisException(
- s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing."
- )
- )
-
- if (schemaParts.nonEmpty) {
- val fields = DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields
- val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
- builder ++= colTypeList.mkString("(", ", ", ")")
+ private def showDataSourceTableDataColumns(
+ metadata: CatalogTable, builder: StringBuilder): Unit = {
+ DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema =>
+ val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
+ builder ++= columns.mkString("(", ", ", ")")
}
builder ++= "\n"
@@ -688,40 +797,21 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
private def showDataSourceTableNonDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
- val props = metadata.properties
-
- def getColumnNamesByType(colType: String, typeName: String): Seq[String] = {
- (for {
- numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
- index <- 0 until numCols.toInt
- } yield props.getOrElse(
- s"spark.sql.sources.schema.${colType}Col.$index",
- throw new AnalysisException(
- s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
- )
- )).map(quoteIdentifier)
- }
-
- val partCols = getColumnNamesByType("part", "partitioning columns")
+ val partCols = DDLUtils.getPartitionColumnsFromTableProperties(metadata)
if (partCols.nonEmpty) {
builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
}
- val bucketCols = getColumnNamesByType("bucket", "bucketing columns")
- if (bucketCols.nonEmpty) {
- builder ++= s"CLUSTERED BY ${bucketCols.mkString("(", ", ", ")")}\n"
-
- val sortCols = getColumnNamesByType("sort", "sorting columns")
- if (sortCols.nonEmpty) {
- builder ++= s"SORTED BY ${sortCols.mkString("(", ", ", ")")}\n"
- }
+ DDLUtils.getBucketSpecFromTableProperties(metadata).foreach { spec =>
+ if (spec.bucketColumnNames.nonEmpty) {
+ builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"
- val numBuckets = props.getOrElse(
- "spark.sql.sources.schema.numBuckets",
- throw new AnalysisException("Corrupted bucket spec in catalog: missing bucket number")
- )
+ if (spec.sortColumnNames.nonEmpty) {
+ builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n"
+ }
- builder ++= s"INTO $numBuckets BUCKETS\n"
+ builder ++= s"INTO ${spec.numBuckets} BUCKETS\n"
+ }
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 78c457b6c2..a4e9f03b43 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -339,6 +339,13 @@ private[hive] class HiveClientImpl(
// partition columns are part of the schema
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
+
+ // Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet)
+ val hasUnsupportedFeatures =
+ !h.getSkewedColNames.isEmpty ||
+ h.getStorageHandler != null ||
+ !h.getBucketCols.isEmpty
+
CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
@@ -365,7 +372,8 @@ private[hive] class HiveClientImpl(
),
properties = h.getParameters.asScala.toMap,
viewOriginalText = Option(h.getViewOriginalText),
- viewText = Option(h.getViewExpandedText))
+ viewText = Option(h.getViewExpandedText),
+ hasUnsupportedFeatures = hasUnsupportedFeatures)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index 12a1ad8987..3b8068d3bc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -116,24 +116,177 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
.bucketBy(2, "c", "d")
.saveAsTable("ddl_test5")
- checkCreateTable(TableIdentifier("ddl_test5", Some("default")))
+ checkCreateTable("ddl_test5")
}
}
+ test("simple hive table") {
+ withTable("t1") {
+ sql(
+ s"""CREATE TABLE t1 (
+ | c1 INT COMMENT 'bla',
+ | c2 STRING
+ |)
+ |TBLPROPERTIES (
+ | 'prop1' = 'value1',
+ | 'prop2' = 'value2'
+ |)
+ """.stripMargin
+ )
+
+ checkCreateTable("t1")
+ }
+ }
+
+ test("simple external hive table") {
+ withTempDir { dir =>
+ withTable("t1") {
+ sql(
+ s"""CREATE TABLE t1 (
+ | c1 INT COMMENT 'bla',
+ | c2 STRING
+ |)
+ |LOCATION '$dir'
+ |TBLPROPERTIES (
+ | 'prop1' = 'value1',
+ | 'prop2' = 'value2'
+ |)
+ """.stripMargin
+ )
+
+ checkCreateTable("t1")
+ }
+ }
+ }
+
+ test("partitioned hive table") {
+ withTable("t1") {
+ sql(
+ s"""CREATE TABLE t1 (
+ | c1 INT COMMENT 'bla',
+ | c2 STRING
+ |)
+ |COMMENT 'bla'
+ |PARTITIONED BY (
+ | p1 BIGINT COMMENT 'bla',
+ | p2 STRING
+ |)
+ """.stripMargin
+ )
+
+ checkCreateTable("t1")
+ }
+ }
+
+ test("hive table with explicit storage info") {
+ withTable("t1") {
+ sql(
+ s"""CREATE TABLE t1 (
+ | c1 INT COMMENT 'bla',
+ | c2 STRING
+ |)
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |COLLECTION ITEMS TERMINATED BY '@'
+ |MAP KEYS TERMINATED BY '#'
+ |NULL DEFINED AS 'NaN'
+ """.stripMargin
+ )
+
+ checkCreateTable("t1")
+ }
+ }
+
+ test("hive table with STORED AS clause") {
+ withTable("t1") {
+ sql(
+ s"""CREATE TABLE t1 (
+ | c1 INT COMMENT 'bla',
+ | c2 STRING
+ |)
+ |STORED AS PARQUET
+ """.stripMargin
+ )
+
+ checkCreateTable("t1")
+ }
+ }
+
+ test("hive table with serde info") {
+ withTable("t1") {
+ sql(
+ s"""CREATE TABLE t1 (
+ | c1 INT COMMENT 'bla',
+ | c2 STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |WITH SERDEPROPERTIES (
+ | 'mapkey.delim' = ',',
+ | 'field.delim' = ','
+ |)
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin
+ )
+
+ checkCreateTable("t1")
+ }
+ }
+
+ test("hive view") {
+ withView("v1") {
+ sql("CREATE VIEW v1 AS SELECT 1 AS a")
+ checkCreateView("v1")
+ }
+ }
+
+ test("hive view with output columns") {
+ withView("v1") {
+ sql("CREATE VIEW v1 (b) AS SELECT 1 AS a")
+ checkCreateView("v1")
+ }
+ }
+
+ test("hive bucketing not supported") {
+ withTable("t1") {
+ createRawHiveTable(
+ s"""CREATE TABLE t1 (a INT, b STRING)
+ |CLUSTERED BY (a)
+ |SORTED BY (b)
+ |INTO 2 BUCKETS
+ """.stripMargin
+ )
+
+ intercept[UnsupportedOperationException] {
+ sql("SHOW CREATE TABLE t1")
+ }
+ }
+ }
+
+ private def createRawHiveTable(ddl: String): Unit = {
+ hiveContext.sharedState.metadataHive.runSqlHive(ddl)
+ }
+
private def checkCreateTable(table: String): Unit = {
- checkCreateTable(TableIdentifier(table, Some("default")))
+ checkCreateTableOrView(TableIdentifier(table, Some("default")), "TABLE")
+ }
+
+ private def checkCreateView(table: String): Unit = {
+ checkCreateTableOrView(TableIdentifier(table, Some("default")), "VIEW")
}
- private def checkCreateTable(table: TableIdentifier): Unit = {
+ private def checkCreateTableOrView(table: TableIdentifier, checkType: String): Unit = {
val db = table.database.getOrElse("default")
val expected = spark.externalCatalog.getTable(db, table.table)
val shownDDL = sql(s"SHOW CREATE TABLE ${table.quotedString}").head().getString(0)
- sql(s"DROP TABLE ${table.quotedString}")
+ sql(s"DROP $checkType ${table.quotedString}")
- withTable(table.table) {
+ try {
sql(shownDDL)
val actual = spark.externalCatalog.getTable(db, table.table)
checkCatalogTables(expected, actual)
+ } finally {
+ sql(s"DROP $checkType IF EXISTS ${table.table}")
}
}
@@ -155,15 +308,31 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
"totalSize",
"totalNumberFiles",
"maxFileSize",
- "minFileSize"
+ "minFileSize",
+ // EXTERNAL is not non-deterministic, but it is filtered out for external tables.
+ "EXTERNAL"
)
table.copy(
createTime = 0L,
lastAccessTime = 0L,
- properties = table.properties.filterKeys(!nondeterministicProps.contains(_)))
+ properties = table.properties.filterKeys(!nondeterministicProps.contains(_)),
+ // View texts are checked separately
+ viewOriginalText = None,
+ viewText = None
+ )
+ }
+
+ // Normalizes attributes auto-generated by Spark SQL for views
+ def normalizeGeneratedAttributes(str: String): String = {
+ str.replaceAll("gen_attr_[0-9]+", "gen_attr_0")
+ }
+
+ // We use expanded canonical view text as original view text of the new table
+ assertResult(expected.viewText.map(normalizeGeneratedAttributes)) {
+ actual.viewOriginalText.map(normalizeGeneratedAttributes)
}
- assert(normalize(expected) == normalize(actual))
+ assert(normalize(actual) == normalize(expected))
}
}