aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala107
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala40
5 files changed, 188 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index a1862f59a0..ebc60edcba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -153,6 +153,44 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * A command for users to list the columm names for a table.
+ * This function creates a [[ShowColumnsCommand]] logical plan.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
+ * }}}
+ */
+ override def visitShowColumns(ctx: ShowColumnsContext): LogicalPlan = withOrigin(ctx) {
+ val table = visitTableIdentifier(ctx.tableIdentifier)
+
+ val lookupTable = Option(ctx.db) match {
+ case None => table
+ case Some(db) if table.database.isDefined =>
+ throw new ParseException("Duplicates the declaration for database", ctx)
+ case Some(db) => TableIdentifier(table.identifier, Some(db.getText))
+ }
+ ShowColumnsCommand(lookupTable)
+ }
+
+ /**
+ * A command for users to list the partition names of a table. If partition spec is specified,
+ * partitions that match the spec are returned. Otherwise an empty result set is returned.
+ *
+ * This function creates a [[ShowPartitionsCommand]] logical plan
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW PARTITIONS table_identifier [partition_spec];
+ * }}}
+ */
+ override def visitShowPartitions(ctx: ShowPartitionsContext): LogicalPlan = withOrigin(ctx) {
+ val table = visitTableIdentifier(ctx.tableIdentifier)
+ val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
+ ShowPartitionsCommand(table, partitionKeys)
+ }
+
+ /**
* Create a [[RefreshTable]] logical plan.
*/
override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 7bb59b7803..6b1d413845 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -17,14 +17,19 @@
package org.apache.spark.sql.execution.command
+import java.io.File
+
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.types._
@@ -112,3 +117,101 @@ case class ExplainCommand(
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
}
+
+/**
+ * A command to list the column names for a table. This function creates a
+ * [[ShowColumnsCommand]] logical plan.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
+ * }}}
+ */
+case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
+ // The result of SHOW COLUMNS has one column called 'result'
+ override val output: Seq[Attribute] = {
+ AttributeReference("result", StringType, nullable = false)() :: Nil
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
+ Row(c.name)
+ }
+ }
+}
+
+/**
+ * A command to list the partition names of a table. If the partition spec is specified,
+ * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under
+ * the following conditions:
+ *
+ * 1. If the command is called for a non partitioned table.
+ * 2. If the partition spec refers to the columns that are not defined as partitioning columns.
+ *
+ * This function creates a [[ShowPartitionsCommand]] logical plan
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
+ * }}}
+ */
+case class ShowPartitionsCommand(
+ table: TableIdentifier,
+ spec: Option[TablePartitionSpec]) extends RunnableCommand {
+ // The result of SHOW PARTITIONS has one column called 'result'
+ override val output: Seq[Attribute] = {
+ AttributeReference("result", StringType, nullable = false)() :: Nil
+ }
+
+ private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
+ partColNames.map { name =>
+ PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
+ }.mkString(File.separator)
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+ val db = table.database.getOrElse(catalog.getCurrentDatabase)
+ if (catalog.isTemporaryTable(table)) {
+ throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " +
+ s"${table.unquotedString}")
+ } else {
+ val tab = catalog.getTableMetadata(table)
+ /**
+ * Validate and throws an [[AnalysisException]] exception under the following conditions:
+ * 1. If the table is not partitioned.
+ * 2. If it is a datasource table.
+ * 3. If it is a view or index table.
+ */
+ if (tab.tableType == CatalogTableType.VIRTUAL_VIEW ||
+ tab.tableType == CatalogTableType.INDEX_TABLE) {
+ throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " +
+ s"${tab.qualifiedName}")
+ }
+ if (!DDLUtils.isTablePartitioned(tab)) {
+ throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " +
+ s"partitioned: ${tab.qualifiedName}")
+ }
+ if (DDLUtils.isDatasourceTable(tab)) {
+ throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " +
+ s"${tab.qualifiedName}")
+ }
+ /**
+ * Validate the partitioning spec by making sure all the referenced columns are
+ * defined as partitioning columns in table definition. An AnalysisException exception is
+ * thrown if the partitioning spec is invalid.
+ */
+ if (spec.isDefined) {
+ val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
+ if (badColumns.nonEmpty) {
+ throw new AnalysisException(
+ s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " +
+ s"specified for SHOW PARTITIONS")
+ }
+ }
+ val partNames =
+ catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames))
+ partNames.map { p => Row(p) }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index f5aa8fb6fa..ecde3320b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -536,5 +536,9 @@ private[sql] object DDLUtils {
case _ =>
})
}
+ def isTablePartitioned(table: CatalogTable): Boolean = {
+ table.partitionColumns.size > 0 ||
+ table.properties.contains("spark.sql.sources.schema.numPartCols")
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 1065bb1047..74f2993754 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -424,7 +424,7 @@ private[sql] object PartitioningUtils {
path.foreach { c =>
if (needsEscaping(c)) {
builder.append('%')
- builder.append(f"${c.asInstanceOf[Int]}%02x")
+ builder.append(f"${c.asInstanceOf[Int]}%02X")
} else {
builder.append(c)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 934c6f362d..9db5ccbbd6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -678,4 +678,44 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
}
+
+ test("show columns") {
+ val sql1 = "SHOW COLUMNS FROM t1"
+ val sql2 = "SHOW COLUMNS IN db1.t1"
+ val sql3 = "SHOW COLUMNS FROM t1 IN db1"
+ val sql4 = "SHOW COLUMNS FROM db1.t1 IN db2"
+
+ val parsed1 = parser.parsePlan(sql1)
+ val expected1 = ShowColumnsCommand(TableIdentifier("t1", None))
+ val parsed2 = parser.parsePlan(sql2)
+ val expected2 = ShowColumnsCommand(TableIdentifier("t1", Some("db1")))
+ val parsed3 = parser.parsePlan(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected2)
+ val message = intercept[ParseException] {
+ parser.parsePlan(sql4)
+ }.getMessage
+ assert(message.contains("Duplicates the declaration for database"))
+ }
+
+ test("show partitions") {
+ val sql1 = "SHOW PARTITIONS t1"
+ val sql2 = "SHOW PARTITIONS db1.t1"
+ val sql3 = "SHOW PARTITIONS t1 PARTITION(partcol1='partvalue', partcol2='partvalue')"
+
+ val parsed1 = parser.parsePlan(sql1)
+ val expected1 =
+ ShowPartitionsCommand(TableIdentifier("t1", None), None)
+ val parsed2 = parser.parsePlan(sql2)
+ val expected2 =
+ ShowPartitionsCommand(TableIdentifier("t1", Some("db1")), None)
+ val expected3 =
+ ShowPartitionsCommand(TableIdentifier("t1", None),
+ Some(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue")))
+ val parsed3 = parser.parsePlan(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ }
}