aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2016-04-27 09:28:24 +0800
committerCheng Lian <lian@databricks.com>2016-04-27 09:28:24 +0800
commitd93976d8660b68eeef646d1fe687cfce01f50f9d (patch)
treee7b6b96ef5abd5972c161033d10112e58483b464
parentbd2c9a6d48ef6d489c747d9db2642bdef6b1f728 (diff)
downloadspark-d93976d8660b68eeef646d1fe687cfce01f50f9d.tar.gz
spark-d93976d8660b68eeef646d1fe687cfce01f50f9d.tar.bz2
spark-d93976d8660b68eeef646d1fe687cfce01f50f9d.zip
[SPARK-14445][SQL] Support native execution of SHOW COLUMNS and SHOW PARTITIONS
## What changes were proposed in this pull request? This PR adds Native execution of SHOW COLUMNS and SHOW PARTITION commands. Command Syntax: ``` SQL SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database] ``` ``` SQL SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)] ``` ## How was this patch tested? Added test cases in HiveCommandSuite to verify execution and DDLCommandSuite to verify plans. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #12222 from dilipbiswal/dkb_show_columns.
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g44
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala13
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala107
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala40
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala19
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala145
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala4
16 files changed, 401 insertions, 31 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 64f68c9e9e..9de313ae87 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -106,6 +106,9 @@ statement
| SHOW DATABASES (LIKE pattern=STRING)? #showDatabases
| SHOW TBLPROPERTIES table=tableIdentifier
('(' key=tablePropertyKey ')')? #showTblProperties
+ | SHOW COLUMNS (FROM | IN) tableIdentifier
+ ((FROM | IN) db=identifier)? #showColumns
+ | SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions
| SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
@@ -128,7 +131,6 @@ hiveNativeCommands
: DELETE FROM tableIdentifier (WHERE booleanExpression)?
| TRUNCATE TABLE tableIdentifier partitionSpec?
(COLUMNS identifierList)?
- | SHOW COLUMNS (FROM | IN) tableIdentifier ((FROM|IN) identifier)?
| START TRANSACTION (transactionMode (',' transactionMode)*)?
| COMMIT WORK?
| ROLLBACK WORK?
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index b8f0e458fa..28a67067d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -300,8 +300,13 @@ class InMemoryCatalog extends ExternalCatalog {
override def listPartitions(
db: String,
- table: String): Seq[CatalogTablePartition] = synchronized {
+ table: String,
+ partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized {
requireTableExists(db, table)
+ if (partialSpec.nonEmpty) {
+ throw new AnalysisException("listPartition does not support partition spec in " +
+ "InMemoryCatalog.")
+ }
catalog(db).tables(table).partitions.values.toSeq
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b36a76a888..402aacfc1f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -471,13 +471,18 @@ class SessionCatalog(
}
/**
- * List all partitions in a table, assuming it exists.
- * If no database is specified, assume the table is in the current database.
+ * List the metadata of all partitions that belong to the specified table, assuming it exists.
+ *
+ * A partial partition spec may optionally be provided to filter the partitions returned.
+ * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
+ * then a partial spec of (a='1') will return the first two only.
*/
- def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = {
+ def listPartitions(
+ tableName: TableIdentifier,
+ partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
val db = tableName.database.getOrElse(currentDb)
val table = formatTableName(tableName.table)
- externalCatalog.listPartitions(db, table)
+ externalCatalog.listPartitions(db, table, partialSpec)
}
// ----------------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index fd5bcad0f8..9e90987731 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -157,8 +157,20 @@ abstract class ExternalCatalog {
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
- // TODO: support listing by pattern
- def listPartitions(db: String, table: String): Seq[CatalogTablePartition]
+ /**
+ * List the metadata of all partitions that belong to the specified table, assuming it exists.
+ *
+ * A partial partition spec may optionally be provided to filter the partitions returned.
+ * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
+ * then a partial spec of (a='1') will return the first two only.
+ * @param db database name
+ * @param table table name
+ * @param partialSpec partition spec
+ */
+ def listPartitions(
+ db: String,
+ table: String,
+ partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
// --------------------------------------------------------------------------
// Functions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index a1862f59a0..ebc60edcba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -153,6 +153,44 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * A command for users to list the columm names for a table.
+ * This function creates a [[ShowColumnsCommand]] logical plan.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
+ * }}}
+ */
+ override def visitShowColumns(ctx: ShowColumnsContext): LogicalPlan = withOrigin(ctx) {
+ val table = visitTableIdentifier(ctx.tableIdentifier)
+
+ val lookupTable = Option(ctx.db) match {
+ case None => table
+ case Some(db) if table.database.isDefined =>
+ throw new ParseException("Duplicates the declaration for database", ctx)
+ case Some(db) => TableIdentifier(table.identifier, Some(db.getText))
+ }
+ ShowColumnsCommand(lookupTable)
+ }
+
+ /**
+ * A command for users to list the partition names of a table. If partition spec is specified,
+ * partitions that match the spec are returned. Otherwise an empty result set is returned.
+ *
+ * This function creates a [[ShowPartitionsCommand]] logical plan
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW PARTITIONS table_identifier [partition_spec];
+ * }}}
+ */
+ override def visitShowPartitions(ctx: ShowPartitionsContext): LogicalPlan = withOrigin(ctx) {
+ val table = visitTableIdentifier(ctx.tableIdentifier)
+ val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
+ ShowPartitionsCommand(table, partitionKeys)
+ }
+
+ /**
* Create a [[RefreshTable]] logical plan.
*/
override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 7bb59b7803..6b1d413845 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -17,14 +17,19 @@
package org.apache.spark.sql.execution.command
+import java.io.File
+
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.types._
@@ -112,3 +117,101 @@ case class ExplainCommand(
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
}
+
+/**
+ * A command to list the column names for a table. This function creates a
+ * [[ShowColumnsCommand]] logical plan.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
+ * }}}
+ */
+case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
+ // The result of SHOW COLUMNS has one column called 'result'
+ override val output: Seq[Attribute] = {
+ AttributeReference("result", StringType, nullable = false)() :: Nil
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
+ Row(c.name)
+ }
+ }
+}
+
+/**
+ * A command to list the partition names of a table. If the partition spec is specified,
+ * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under
+ * the following conditions:
+ *
+ * 1. If the command is called for a non partitioned table.
+ * 2. If the partition spec refers to the columns that are not defined as partitioning columns.
+ *
+ * This function creates a [[ShowPartitionsCommand]] logical plan
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
+ * }}}
+ */
+case class ShowPartitionsCommand(
+ table: TableIdentifier,
+ spec: Option[TablePartitionSpec]) extends RunnableCommand {
+ // The result of SHOW PARTITIONS has one column called 'result'
+ override val output: Seq[Attribute] = {
+ AttributeReference("result", StringType, nullable = false)() :: Nil
+ }
+
+ private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
+ partColNames.map { name =>
+ PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
+ }.mkString(File.separator)
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+ val db = table.database.getOrElse(catalog.getCurrentDatabase)
+ if (catalog.isTemporaryTable(table)) {
+ throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " +
+ s"${table.unquotedString}")
+ } else {
+ val tab = catalog.getTableMetadata(table)
+ /**
+ * Validate and throws an [[AnalysisException]] exception under the following conditions:
+ * 1. If the table is not partitioned.
+ * 2. If it is a datasource table.
+ * 3. If it is a view or index table.
+ */
+ if (tab.tableType == CatalogTableType.VIRTUAL_VIEW ||
+ tab.tableType == CatalogTableType.INDEX_TABLE) {
+ throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " +
+ s"${tab.qualifiedName}")
+ }
+ if (!DDLUtils.isTablePartitioned(tab)) {
+ throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " +
+ s"partitioned: ${tab.qualifiedName}")
+ }
+ if (DDLUtils.isDatasourceTable(tab)) {
+ throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " +
+ s"${tab.qualifiedName}")
+ }
+ /**
+ * Validate the partitioning spec by making sure all the referenced columns are
+ * defined as partitioning columns in table definition. An AnalysisException exception is
+ * thrown if the partitioning spec is invalid.
+ */
+ if (spec.isDefined) {
+ val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
+ if (badColumns.nonEmpty) {
+ throw new AnalysisException(
+ s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " +
+ s"specified for SHOW PARTITIONS")
+ }
+ }
+ val partNames =
+ catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames))
+ partNames.map { p => Row(p) }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index f5aa8fb6fa..ecde3320b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -536,5 +536,9 @@ private[sql] object DDLUtils {
case _ =>
})
}
+ def isTablePartitioned(table: CatalogTable): Boolean = {
+ table.partitionColumns.size > 0 ||
+ table.properties.contains("spark.sql.sources.schema.numPartCols")
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 1065bb1047..74f2993754 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -424,7 +424,7 @@ private[sql] object PartitioningUtils {
path.foreach { c =>
if (needsEscaping(c)) {
builder.append('%')
- builder.append(f"${c.asInstanceOf[Int]}%02x")
+ builder.append(f"${c.asInstanceOf[Int]}%02X")
} else {
builder.append(c)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 934c6f362d..9db5ccbbd6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -678,4 +678,44 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
}
+
+ test("show columns") {
+ val sql1 = "SHOW COLUMNS FROM t1"
+ val sql2 = "SHOW COLUMNS IN db1.t1"
+ val sql3 = "SHOW COLUMNS FROM t1 IN db1"
+ val sql4 = "SHOW COLUMNS FROM db1.t1 IN db2"
+
+ val parsed1 = parser.parsePlan(sql1)
+ val expected1 = ShowColumnsCommand(TableIdentifier("t1", None))
+ val parsed2 = parser.parsePlan(sql2)
+ val expected2 = ShowColumnsCommand(TableIdentifier("t1", Some("db1")))
+ val parsed3 = parser.parsePlan(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected2)
+ val message = intercept[ParseException] {
+ parser.parsePlan(sql4)
+ }.getMessage
+ assert(message.contains("Duplicates the declaration for database"))
+ }
+
+ test("show partitions") {
+ val sql1 = "SHOW PARTITIONS t1"
+ val sql2 = "SHOW PARTITIONS db1.t1"
+ val sql3 = "SHOW PARTITIONS t1 PARTITION(partcol1='partvalue', partcol2='partvalue')"
+
+ val parsed1 = parser.parsePlan(sql1)
+ val expected1 =
+ ShowPartitionsCommand(TableIdentifier("t1", None), None)
+ val parsed2 = parser.parsePlan(sql2)
+ val expected2 =
+ ShowPartitionsCommand(TableIdentifier("t1", Some("db1")), None)
+ val expected3 =
+ ShowPartitionsCommand(TableIdentifier("t1", None),
+ Some(Map("partcol1" -> "partvalue", "partcol2" -> "partvalue")))
+ val parsed3 = parser.parsePlan(sql3)
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ comparePlans(parsed3, expected3)
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index a92a94cae5..313093818f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -283,10 +283,14 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.getPartition(db, table, spec)
}
+ /**
+ * Returns the partition names from hive metastore for a given table in a database.
+ */
override def listPartitions(
db: String,
- table: String): Seq[CatalogTablePartition] = withClient {
- client.getAllPartitions(db, table)
+ table: String,
+ partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
+ client.getPartitions(db, table, partialSpec)
}
// --------------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 0520e75306..367fcf13d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -130,7 +130,7 @@ private[hive] case class MetastoreRelation(
// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
- private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable)
+ private lazy val allPartitions: Seq[CatalogTablePartition] = client.getPartitions(catalogTable)
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index ae719f86aa..ef08a39c17 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -163,13 +163,24 @@ private[hive] trait HiveClient {
table: CatalogTable,
spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition]
- /** Returns all partitions for the given table. */
- final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = {
- getAllPartitions(getTable(db, table))
+ /**
+ * Returns the partitions for the given table that match the supplied partition spec.
+ * If no partition spec is specified, all partitions are returned.
+ */
+ final def getPartitions(
+ db: String,
+ table: String,
+ partialSpec: Option[ExternalCatalog.TablePartitionSpec]): Seq[CatalogTablePartition] = {
+ getPartitions(getTable(db, table), partialSpec)
}
- /** Returns all partitions for the given table. */
- def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition]
+ /**
+ * Returns the partitions for the given table that match the supplied partition spec.
+ * If no partition spec is specified, all partitions are returned.
+ */
+ def getPartitions(
+ table: CatalogTable,
+ partialSpec: Option[ExternalCatalog.TablePartitionSpec] = None): Seq[CatalogTablePartition]
/** Returns partitions filtered by predicates for the given table. */
def getPartitionsByFilter(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 6327431368..6a7345f758 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -28,8 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
-import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException}
+import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
@@ -41,6 +40,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.util.{CircularBuffer, Utils}
@@ -422,15 +422,24 @@ private[hive] class HiveClientImpl(
override def getPartitionOption(
table: CatalogTable,
- spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
+ spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table)
val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
Option(hivePartition).map(fromHivePartition)
}
- override def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] = withHiveState {
+ /**
+ * Returns the partitions for the given table that match the supplied partition spec.
+ * If no partition spec is specified, all partitions are returned.
+ */
+ override def getPartitions(
+ table: CatalogTable,
+ spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table)
- shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
+ spec match {
+ case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
+ case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
+ }
}
override def getPartitionsByFilter(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index e0288ff98f..916a470aa5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -189,7 +189,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test(s"$version: getPartitions") {
- client.getAllPartitions(client.getTable("default", "src_part"))
+ client.getPartitions(client.getTable("default", "src_part"))
}
test(s"$version: getPartitionsByFilter") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 8b4e4dced8..a4c6d3c185 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -17,12 +17,15 @@
package org.apache.spark.sql.hive.execution
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
import org.apache.spark.sql.test.SQLTestUtils
class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
- protected override def beforeAll(): Unit = {
+ import testImplicits._
+
+ protected override def beforeAll(): Unit = {
super.beforeAll()
sql(
"""
@@ -30,18 +33,44 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
|USING org.apache.spark.sql.parquet.DefaultSource
""".stripMargin)
- sql(
+ sql(
"""
|CREATE EXTERNAL TABLE parquet_tab2 (c1 INT, c2 STRING)
|STORED AS PARQUET
|TBLPROPERTIES('prop1Key'="prop1Val", '`prop2Key`'="prop2Val")
""".stripMargin)
+ sql("CREATE TABLE parquet_tab3(col1 int, `col 2` int)")
+ sql("CREATE TABLE parquet_tab4 (price int, qty int) partitioned by (year int, month int)")
+ sql("INSERT INTO parquet_tab4 PARTITION(year = 2015, month = 1) SELECT 1, 1")
+ sql("INSERT INTO parquet_tab4 PARTITION(year = 2015, month = 2) SELECT 2, 2")
+ sql("INSERT INTO parquet_tab4 PARTITION(year = 2016, month = 2) SELECT 3, 3")
+ sql("INSERT INTO parquet_tab4 PARTITION(year = 2016, month = 3) SELECT 3, 3")
+ sql(
+ """
+ |CREATE TABLE parquet_tab5 (price int, qty int)
+ |PARTITIONED BY (year int, month int, hour int, minute int, sec int, extra int)
+ """.stripMargin)
+ sql(
+ """
+ |INSERT INTO parquet_tab5
+ |PARTITION(year = 2016, month = 3, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3
+ """.stripMargin)
+ sql(
+ """
+ |INSERT INTO parquet_tab5
+ |PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3
+ """.stripMargin)
+ sql("CREATE VIEW parquet_view1 as select * from parquet_tab4")
}
override protected def afterAll(): Unit = {
try {
sql("DROP TABLE IF EXISTS parquet_tab1")
sql("DROP TABLE IF EXISTS parquet_tab2")
+ sql("DROP TABLE IF EXISTS parquet_tab3")
+ sql("DROP VIEW IF EXISTS parquet_view1")
+ sql("DROP TABLE IF EXISTS parquet_tab4")
+ sql("DROP TABLE IF EXISTS parquet_tab5")
} finally {
super.afterAll()
}
@@ -247,4 +276,112 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
hiveContext.sessionState.hadoopConf.set("fs.default.name", originalFsName)
}
}
+
+ test("show columns") {
+ checkAnswer(
+ sql("SHOW COLUMNS IN parquet_tab3"),
+ Row("col1") :: Row("col 2") :: Nil)
+
+ checkAnswer(
+ sql("SHOW COLUMNS IN default.parquet_tab3"),
+ Row("col1") :: Row("col 2") :: Nil)
+
+ checkAnswer(
+ sql("SHOW COLUMNS IN parquet_tab3 FROM default"),
+ Row("col1") :: Row("col 2") :: Nil)
+
+ checkAnswer(
+ sql("SHOW COLUMNS IN parquet_tab4 IN default"),
+ Row("price") :: Row("qty") :: Row("year") :: Row("month") :: Nil)
+
+ val message = intercept[NoSuchTableException] {
+ sql("SHOW COLUMNS IN badtable FROM default")
+ }.getMessage
+ assert(message.contains("badtable not found in database"))
+ }
+
+ test("show partitions - show everything") {
+ checkAnswer(
+ sql("show partitions parquet_tab4"),
+ Row("year=2015/month=1") ::
+ Row("year=2015/month=2") ::
+ Row("year=2016/month=2") ::
+ Row("year=2016/month=3") :: Nil)
+
+ checkAnswer(
+ sql("show partitions default.parquet_tab4"),
+ Row("year=2015/month=1") ::
+ Row("year=2015/month=2") ::
+ Row("year=2016/month=2") ::
+ Row("year=2016/month=3") :: Nil)
+ }
+
+ test("show partitions - show everything more than 5 part keys") {
+ checkAnswer(
+ sql("show partitions parquet_tab5"),
+ Row("year=2016/month=3/hour=10/minute=10/sec=10/extra=1") ::
+ Row("year=2016/month=4/hour=10/minute=10/sec=10/extra=1") :: Nil)
+ }
+
+ test("show partitions - filter") {
+ checkAnswer(
+ sql("show partitions default.parquet_tab4 PARTITION(year=2015)"),
+ Row("year=2015/month=1") ::
+ Row("year=2015/month=2") :: Nil)
+
+ checkAnswer(
+ sql("show partitions default.parquet_tab4 PARTITION(year=2015, month=1)"),
+ Row("year=2015/month=1") :: Nil)
+
+ checkAnswer(
+ sql("show partitions default.parquet_tab4 PARTITION(month=2)"),
+ Row("year=2015/month=2") ::
+ Row("year=2016/month=2") :: Nil)
+ }
+
+ test("show partitions - empty row") {
+ withTempTable("parquet_temp") {
+ sql(
+ """
+ |CREATE TEMPORARY TABLE parquet_temp (c1 INT, c2 STRING)
+ |USING org.apache.spark.sql.parquet.DefaultSource
+ """.stripMargin)
+ // An empty sequence of row is returned for session temporary table.
+ val message1 = intercept[AnalysisException] {
+ sql("SHOW PARTITIONS parquet_temp")
+ }.getMessage
+ assert(message1.contains("is not allowed on a temporary table"))
+
+ val message2 = intercept[AnalysisException] {
+ sql("SHOW PARTITIONS parquet_tab3")
+ }.getMessage
+ assert(message2.contains("not allowed on a table that is not partitioned"))
+
+ val message3 = intercept[AnalysisException] {
+ sql("SHOW PARTITIONS parquet_tab4 PARTITION(abcd=2015, xyz=1)")
+ }.getMessage
+ assert(message3.contains("Non-partitioning column(s) [abcd, xyz] are specified"))
+
+ val message4 = intercept[AnalysisException] {
+ sql("SHOW PARTITIONS parquet_view1")
+ }.getMessage
+ assert(message4.contains("is not allowed on a view or index table"))
+ }
+ }
+
+ test("show partitions - datasource") {
+ withTable("part_datasrc") {
+ val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
+ df.write
+ .partitionBy("a")
+ .format("parquet")
+ .mode(SaveMode.Overwrite)
+ .saveAsTable("part_datasrc")
+
+ val message1 = intercept[AnalysisException] {
+ sql("SHOW PARTITIONS part_datasrc")
+ }.getMessage
+ assert(message1.contains("is not allowed on a datasource table"))
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index aac5cc6d40..3a9c981be4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.SQLBuilder
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand, HiveNativeCommand, SetCommand}
+import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand, HiveNativeCommand, SetCommand, ShowColumnsCommand}
import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable}
import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution}
@@ -175,7 +175,7 @@ abstract class HiveComparisonTest
.filterNot(_ == "")
case _: HiveNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
- case _: DescribeTableCommand =>
+ case _: DescribeTableCommand | ShowColumnsCommand(_) =>
// Filter out non-deterministic lines and lines which do not have actual results but
// can introduce problems because of the way Hive formats these lines.
// Then, remove empty lines. Do not sort the results.