From f152fae306dc75565cb4648ee1211416d7c0bb23 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 4 May 2016 16:44:09 +0800 Subject: [SPARK-14127][SQL] Native "DESC [EXTENDED | FORMATTED] " DDL command ## What changes were proposed in this pull request? This PR implements native `DESC [EXTENDED | FORMATTED]
` DDL command. Sample output: ``` scala> spark.sql("desc extended src").show(100, truncate = false) +----------------------------+---------------------------------+-------+ |col_name |data_type |comment| +----------------------------+---------------------------------+-------+ |key |int | | |value |string | | | | | | |# Detailed Table Information|CatalogTable(`default`.`src`, ...| | +----------------------------+---------------------------------+-------+ scala> spark.sql("desc formatted src").show(100, truncate = false) +----------------------------+----------------------------------------------------------+-------+ |col_name |data_type |comment| +----------------------------+----------------------------------------------------------+-------+ |key |int | | |value |string | | | | | | |# Detailed Table Information| | | |Database: |default | | |Owner: |lian | | |Create Time: |Mon Jan 04 17:06:00 CST 2016 | | |Last Access Time: |Thu Jan 01 08:00:00 CST 1970 | | |Location: |hdfs://localhost:9000/user/hive/warehouse_hive121/src | | |Table Type: |MANAGED | | |Table Parameters: | | | | transient_lastDdlTime |1451898360 | | | | | | |# Storage Information | | | |SerDe Library: |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | |InputFormat: |org.apache.hadoop.mapred.TextInputFormat | | |OutputFormat: |org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat| | |Num Buckets: |-1 | | |Bucket Columns: |[] | | |Sort Columns: |[] | | |Storage Desc Parameters: | | | | serialization.format |1 | | +----------------------------+----------------------------------------------------------+-------+ ``` ## How was this patch tested? A test case is added to `HiveDDLSuite` to check command output. Author: Cheng Lian Closes #12844 from liancheng/spark-14127-desc-table. --- .../spark/sql/catalyst/catalog/interface.scala | 5 +- .../catalyst/catalog/ExternalCatalogSuite.scala | 1 + .../spark/sql/execution/SparkSqlParser.scala | 14 ++- .../execution/command/createDataSourceTables.scala | 2 + .../spark/sql/execution/command/tables.scala | 104 +++++++++++++++++---- .../spark/sql/execution/command/DDLSuite.scala | 6 +- .../spark/sql/hive/client/HiveClientImpl.scala | 4 +- .../sql/hive/execution/CreateTableAsSelect.scala | 3 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 2 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 1 + .../spark/sql/hive/client/VersionsSuite.scala | 1 + .../spark/sql/hive/execution/HiveDDLSuite.scala | 17 ++++ 12 files changed, 131 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 3851e4c706..2c6e9f53b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -48,6 +48,7 @@ case class CatalogStorageFormat( inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], + compressed: Boolean, serdeProperties: Map[String, String]) @@ -89,6 +90,7 @@ case class CatalogTable( sortColumnNames: Seq[String] = Seq.empty, bucketColumnNames: Seq[String] = Seq.empty, numBuckets: Int = -1, + owner: String = "", createTime: Long = System.currentTimeMillis, lastAccessTime: Long = -1, properties: Map[String, String] = Map.empty, @@ -123,10 +125,11 @@ case class CatalogTable( locationUri: Option[String] = storage.locationUri, inputFormat: Option[String] = storage.inputFormat, outputFormat: Option[String] = storage.outputFormat, + compressed: Boolean = false, serde: Option[String] = storage.serde, serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = { copy(storage = CatalogStorageFormat( - locationUri, inputFormat, outputFormat, serde, serdeProperties)) + locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index d739b17743..ae7c503e65 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -507,6 +507,7 @@ abstract class CatalogTestUtils { inputFormat = Some(tableInputFormat), outputFormat = Some(tableOutputFormat), serde = None, + compressed = false, serdeProperties = Map.empty) lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index b000cc9953..60388df596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -243,10 +243,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) { // FORMATTED and columns are not supported. Return null and let the parser decide what to do // with this (create an exception or pass it on to a different system). - if (ctx.describeColName != null || ctx.FORMATTED != null || ctx.partitionSpec != null) { + if (ctx.describeColName != null || ctx.partitionSpec != null) { null } else { - DescribeTableCommand(visitTableIdentifier(ctx.tableIdentifier), ctx.EXTENDED != null) + DescribeTableCommand( + visitTableIdentifier(ctx.tableIdentifier), + ctx.EXTENDED != null, + ctx.FORMATTED() != null) } } @@ -766,6 +769,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // Note: Keep this unspecified because we use the presence of the serde to decide // whether to convert a table created by CTAS to a datasource table. serde = None, + compressed = false, serdeProperties = Map()) } val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) @@ -777,6 +781,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + compressed = false, serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) // TODO support the sql text - have a proper location for this! @@ -830,7 +835,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** Empty storage format for default values and copies. */ - private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty) + private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty) /** * Create a [[CatalogStorageFormat]]. @@ -911,6 +916,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { entry("field.delim", ctx.fieldsTerminatedBy) ++ entry("serialization.format", ctx.fieldsTerminatedBy) ++ entry("escape.delim", ctx.escapedBy) ++ + // The following typo is inherited from Hive... entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ entry("mapkey.delim", ctx.keysTerminatedBy) ++ Option(ctx.linesSeparatedBy).toSeq.map { token => @@ -1051,7 +1057,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { case c: RowFormatSerdeContext => // Use a serde format. - val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) + val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index f670f63472..e07ab99ef3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -349,6 +349,7 @@ object CreateDataSourceTableUtils extends Logging { inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = options ), properties = tableProperties.toMap) @@ -368,6 +369,7 @@ object CreateDataSourceTableUtils extends Logging { inputFormat = serde.inputFormat, outputFormat = serde.outputFormat, serde = serde.serde, + compressed = false, serdeProperties = options ), schema = relation.schema.map { f => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 489c980c16..31c804f7a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -19,16 +19,17 @@ package org.apache.spark.sql.execution.command import java.io.File import java.net.URI +import java.util.Date import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogRelation, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} -import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType} +import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, StructType} import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( @@ -269,10 +270,10 @@ case class LoadData( /** * Command that looks like * {{{ - * DESCRIBE (EXTENDED) table_name; + * DESCRIBE [EXTENDED|FORMATTED] table_name; * }}} */ -case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) +case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isFormatted: Boolean) extends RunnableCommand { override val output: Seq[Attribute] = Seq( @@ -289,29 +290,92 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) val result = new ArrayBuffer[Row] sparkSession.sessionState.catalog.lookupRelation(table) match { case catalogRelation: CatalogRelation => - catalogRelation.catalogTable.schema.foreach { column => - result += Row(column.name, column.dataType, column.comment.orNull) - } - - if (catalogRelation.catalogTable.partitionColumns.nonEmpty) { - result += Row("# Partition Information", "", "") - result += Row(s"# ${output(0).name}", output(1).name, output(2).name) - - catalogRelation.catalogTable.partitionColumns.foreach { col => - result += Row(col.name, col.dataType, col.comment.orNull) - } + if (isExtended) { + describeExtended(catalogRelation, result) + } else if (isFormatted) { + describeFormatted(catalogRelation, result) + } else { + describe(catalogRelation, result) } case relation => - relation.schema.fields.foreach { field => - val comment = - if (field.metadata.contains("comment")) field.metadata.getString("comment") else "" - result += Row(field.name, field.dataType.simpleString, comment) - } + describeSchema(relation.schema, result) } result } + + // Shows data columns and partitioned columns (if any) + private def describe(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { + describeSchema(relation.catalogTable.schema, buffer) + + if (relation.catalogTable.partitionColumns.nonEmpty) { + append(buffer, "# Partition Information", "", "") + append(buffer, s"# ${output(0).name}", output(1).name, output(2).name) + describeSchema(relation.catalogTable.partitionColumns, buffer) + } + } + + private def describeExtended(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { + describe(relation, buffer) + + append(buffer, "", "", "") + append(buffer, "# Detailed Table Information", relation.catalogTable.toString, "") + } + + private def describeFormatted(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { + describe(relation, buffer) + + val table = relation.catalogTable + + append(buffer, "", "", "") + append(buffer, "# Detailed Table Information", "", "") + append(buffer, "Database:", table.database, "") + append(buffer, "Owner:", table.owner, "") + append(buffer, "Create Time:", new Date(table.createTime).toString, "") + append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "") + append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "") + append(buffer, "Table Type:", table.tableType.name, "") + + append(buffer, "Table Parameters:", "", "") + table.properties.foreach { case (key, value) => + append(buffer, s" $key", value, "") + } + + append(buffer, "", "", "") + append(buffer, "# Storage Information", "", "") + table.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) + table.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) + table.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) + append(buffer, "Compressed:", if (table.storage.compressed) "Yes" else "No", "") + append(buffer, "Num Buckets:", table.numBuckets.toString, "") + append(buffer, "Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "") + + append(buffer, "Storage Desc Parameters:", "", "") + table.storage.serdeProperties.foreach { case (key, value) => + append(buffer, s" $key", value, "") + } + } + + private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = { + schema.foreach { column => + val comment = + if (column.metadata.contains("comment")) column.metadata.getString("comment") else "" + append(buffer, column.name, column.dataType.simpleString, comment) + } + } + + private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = { + schema.foreach { column => + append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull) + } + } + + private def append( + buffer: ArrayBuffer[Row], column: String, dataType: String, comment: String): Unit = { + buffer += Row(column, dataType, comment) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 12acb9f276..0ae099ecc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -76,6 +76,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = Map()) catalog.createTable(CatalogTable( identifier = name, @@ -89,7 +90,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { catalog: SessionCatalog, spec: TablePartitionSpec, tableName: TableIdentifier): Unit = { - val part = CatalogTablePartition(spec, CatalogStorageFormat(None, None, None, None, Map())) + val part = CatalogTablePartition( + spec, CatalogStorageFormat(None, None, None, None, false, Map())) catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false) } @@ -264,6 +266,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = Map()) val expectedTable = CatalogTable( @@ -288,6 +291,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = Map()) val expectedTable = CatalogTable( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 47d9546c4f..cddc0b6e34 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -351,6 +351,7 @@ private[hive] class HiveClientImpl( sortColumnNames = Seq(), // TODO: populate this bucketColumnNames = h.getBucketCols.asScala, numBuckets = h.getNumBuckets, + owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( @@ -358,6 +359,7 @@ private[hive] class HiveClientImpl( inputFormat = Option(h.getInputFormatClass).map(_.getName), outputFormat = Option(h.getOutputFormatClass).map(_.getName), serde = Option(h.getSerializationLib), + compressed = h.getTTable.getSd.isCompressed, serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap ), properties = h.getParameters.asScala.toMap, @@ -788,7 +790,7 @@ private[hive] class HiveClientImpl( inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), + compressed = apiPartition.getSd.isCompressed, serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) } - } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 08d4b99d30..9dfbafae87 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -56,7 +56,8 @@ case class CreateTableAsSelect( outputFormat = tableDesc.storage.outputFormat .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), - serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName))) + serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)), + compressed = tableDesc.storage.compressed) val withSchema = if (withFormat.schema.isEmpty) { // Hive doesn't support specifying the column list for target table in CTAS diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 8dc3c64353..c4ebc604dc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -579,7 +579,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(source2.table == "table2") } - test("load data") { + test("load data") { val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1" val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect { case LoadData(t, path, l, o, partition) => (t, path, l, o, partition) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c3a9f2479c..4bdcb96feb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -732,6 +732,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = Map( "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 9341b3816f..a6a5ab3988 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -157,6 +157,7 @@ class VersionsSuite extends SparkFunSuite with Logging { outputFormat = Some( classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName), serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()), + compressed = false, serdeProperties = Map.empty )) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 687a4a7e51..373d1a1e0e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -348,4 +348,21 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("desc table") { + withTable("tab1") { + val tabName = "tab1" + sql(s"CREATE TABLE $tabName(c1 int)") + + assert(sql(s"DESC $tabName").collect().length == 1) + + assert( + sql(s"DESC FORMATTED $tabName").collect() + .exists(_.getString(0) == "# Storage Information")) + + assert( + sql(s"DESC EXTENDED $tabName").collect() + .exists(_.getString(0) == "# Detailed Table Information")) + } + } } -- cgit v1.2.3