diff options
Diffstat (limited to 'sql/core')
5 files changed, 188 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a1862f59a0..ebc60edcba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -153,6 +153,44 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * A command for users to list the columm names for a table. + * This function creates a [[ShowColumnsCommand]] logical plan. + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; + * }}} + */ + override def visitShowColumns(ctx: ShowColumnsContext): LogicalPlan = withOrigin(ctx) { + val table = visitTableIdentifier(ctx.tableIdentifier) + + val lookupTable = Option(ctx.db) match { + case None => table + case Some(db) if table.database.isDefined => + throw new ParseException("Duplicates the declaration for database", ctx) + case Some(db) => TableIdentifier(table.identifier, Some(db.getText)) + } + ShowColumnsCommand(lookupTable) + } + + /** + * A command for users to list the partition names of a table. If partition spec is specified, + * partitions that match the spec are returned. Otherwise an empty result set is returned. + * + * This function creates a [[ShowPartitionsCommand]] logical plan + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW PARTITIONS table_identifier [partition_spec]; + * }}} + */ + override def visitShowPartitions(ctx: ShowPartitionsContext): LogicalPlan = withOrigin(ctx) { + val table = visitTableIdentifier(ctx.tableIdentifier) + val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) + ShowPartitionsCommand(table, partitionKeys) + } + + /** * Create a [[RefreshTable]] logical plan. */ override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 7bb59b7803..6b1d413845 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -17,14 +17,19 @@ package org.apache.spark.sql.execution.command +import java.io.File + import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.types._ @@ -112,3 +117,101 @@ case class ExplainCommand( ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } + +/** + * A command to list the column names for a table. This function creates a + * [[ShowColumnsCommand]] logical plan. + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; + * }}} + */ +case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { + // The result of SHOW COLUMNS has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c => + Row(c.name) + } + } +} + +/** + * A command to list the partition names of a table. If the partition spec is specified, + * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under + * the following conditions: + * + * 1. If the command is called for a non partitioned table. + * 2. If the partition spec refers to the columns that are not defined as partitioning columns. + * + * This function creates a [[ShowPartitionsCommand]] logical plan + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)] + * }}} + */ +case class ShowPartitionsCommand( + table: TableIdentifier, + spec: Option[TablePartitionSpec]) extends RunnableCommand { + // The result of SHOW PARTITIONS has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { + partColNames.map { name => + PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name)) + }.mkString(File.separator) + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val db = table.database.getOrElse(catalog.getCurrentDatabase) + if (catalog.isTemporaryTable(table)) { + throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " + + s"${table.unquotedString}") + } else { + val tab = catalog.getTableMetadata(table) + /** + * Validate and throws an [[AnalysisException]] exception under the following conditions: + * 1. If the table is not partitioned. + * 2. If it is a datasource table. + * 3. If it is a view or index table. + */ + if (tab.tableType == CatalogTableType.VIRTUAL_VIEW || + tab.tableType == CatalogTableType.INDEX_TABLE) { + throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " + + s"${tab.qualifiedName}") + } + if (!DDLUtils.isTablePartitioned(tab)) { + throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " + + s"partitioned: ${tab.qualifiedName}") + } + if (DDLUtils.isDatasourceTable(tab)) { + throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " + + s"${tab.qualifiedName}") + } + /** + * Validate the partitioning spec by making sure all the referenced columns are + * defined as partitioning columns in table definition. An AnalysisException exception is + * thrown if the partitioning spec is invalid. + */ + if (spec.isDefined) { + val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains) + if (badColumns.nonEmpty) { + throw new AnalysisException( + s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " + + s"specified for SHOW PARTITIONS") + } + } + val partNames = + catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames)) + partNames.map { p => Row(p) } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f5aa8fb6fa..ecde3320b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -536,5 +536,9 @@ private[sql] object DDLUtils { case _ => }) } + def isTablePartitioned(table: CatalogTable): Boolean = { + table.partitionColumns.size > 0 || + table.properties.contains("spark.sql.sources.schema.numPartCols") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 1065bb1047..74f2993754 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -424,7 +424,7 @@ private[sql] object PartitioningUtils { path.foreach { c => if (needsEscaping(c)) { builder.append('%') - builder.append(f"${c.asInstanceOf[Int]}%02x") + builder.append(f"${c.asInstanceOf[Int]}%02X") } else { builder.append(c) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 934c6f362d..9db5ccbbd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -678,4 +678,44 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed3, expected3) comparePlans(parsed4, expected4) } + + test("show columns") { + val sql1 = "SHOW COLUMNS FROM t1" + val sql2 = "SHOW COLUMNS IN db1.t1" + val sql3 = "SHOW COLUMNS FROM t1 IN db1" + val sql4 = "SHOW COLUMNS FROM db1.t1 IN db2" + + val parsed1 = parser.parsePlan(sql1) + val expected1 = ShowColumnsCommand(TableIdentifier("t1", None)) + val parsed2 = parser.parsePlan(sql2) + val expected2 = ShowColumnsCommand(TableIdentifier("t1", Some("db1"))) + val parsed3 = parser.parsePlan(sql3) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected2) + val message = intercept[ParseException] { + parser.parsePlan(sql4) + }.getMessage + assert(message.contains("Duplicates the declaration for database")) + } + + test("show partitions") { + val sql1 = "SHOW PARTITIONS t1" + val sql2 = "SHOW PARTITIONS db1.t1" + val sql3 = "SHOW PARTITIONS t1 PARTITION(partcol1='partvalue', partcol2='partvalue')" + + val parsed1 = parser.parsePlan(sql1) + val expected1 = + ShowPartitionsCommand(TableIdentifier("t1", None), None) + val parsed2 = parser.parsePlan(sql2) + val expected2 = + ShowPartitionsCommand(TableIdentifier("t1", Some("db1")), None) + val expected3 = + ShowPartitionsCommand(TableIdentifier("t1", None), + Some(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue"))) + val parsed3 = parser.parsePlan(sql3) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } } |