aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-03-14 23:09:10 -0700
committerReynold Xin <rxin@databricks.com>2016-03-14 23:09:10 -0700
commite64958001cb95d53c441131f8c7a92556f49fd7d (patch)
treeb2b1471353c076d8f9c0cdc50700072910867030
parentf72743d971a38d3d08984ef4b66e0955945d2f58 (diff)
downloadspark-e64958001cb95d53c441131f8c7a92556f49fd7d.tar.gz
spark-e64958001cb95d53c441131f8c7a92556f49fd7d.tar.bz2
spark-e64958001cb95d53c441131f8c7a92556f49fd7d.zip
[SPARK-13884][SQL] Remove DescribeCommand's dependency on LogicalPlan
## What changes were proposed in this pull request? This patch removes DescribeCommand's dependency on LogicalPlan. After this patch, DescribeCommand simply accepts a TableIdentifier. It minimizes the dependency, and blocks my next patch (removes SQLContext dependency from SparkPlanner). ## How was this patch tested? Should be covered by existing unit tests and Hive compatibility tests that run describe table. Author: Reynold Xin <rxin@databricks.com> Closes #11710 from rxin/SPARK-13884.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala57
6 files changed, 49 insertions, 46 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
index 8dde308f96..11391bd12a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
@@ -271,15 +271,14 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
// issue.
val tableIdent = TableIdentifier(
cleanIdentifier(tableName), Some(cleanIdentifier(dbName)))
- datasources.DescribeCommand(
- UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined)
+ datasources.DescribeCommand(tableIdent, isExtended = extended.isDefined)
case Token(dbName, Nil) :: Token(tableName, Nil) :: Token(colName, Nil) :: Nil =>
// It is describing a column with the format like "describe db.table column".
nodeToDescribeFallback(node)
case tableName :: Nil =>
// It is describing a table with the format like "describe table".
datasources.DescribeCommand(
- UnresolvedRelation(TableIdentifier(cleanIdentifier(tableName.text)), None),
+ TableIdentifier(cleanIdentifier(tableName.text)),
isExtended = extended.isDefined)
case _ =>
nodeToDescribeFallback(node)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index bae0750788..6352c48c76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -398,11 +398,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
case describe @ LogicalDescribeCommand(table, isExtended) =>
- val resultPlan = self.sqlContext.executePlan(table).executedPlan
- ExecutedCommand(
- RunnableDescribeCommand(resultPlan, describe.output, isExtended)) :: Nil
+ ExecutedCommand(RunnableDescribeCommand(table, describe.output, isExtended)) :: Nil
- case logical.ShowFunctions(db, pattern) => ExecutedCommand(ShowFunctions(db, pattern)) :: Nil
+ case logical.ShowFunctions(db, pattern) =>
+ ExecutedCommand(ShowFunctions(db, pattern)) :: Nil
case logical.DescribeFunction(function, extended) =>
ExecutedCommand(DescribeFunction(function, extended)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 54cdcb10ac..6e36a15a6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -22,7 +22,7 @@ import java.util.NoSuchElementException
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SQLContext}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
@@ -293,13 +293,14 @@ case object ClearCacheCommand extends RunnableCommand {
case class DescribeCommand(
- child: SparkPlan,
+ table: TableIdentifier,
override val output: Seq[Attribute],
isExtended: Boolean)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
- child.schema.fields.map { field =>
+ val relation = sqlContext.sessionState.catalog.lookupRelation(table)
+ relation.schema.fields.map { field =>
val cmtKey = "comment"
val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
Row(field.name, field.dataType.simpleString, comment)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 903c9913ac..04e51735c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -33,8 +33,9 @@ import org.apache.spark.sql.types._
* It is effective only when the table is a Hive table.
*/
case class DescribeCommand(
- table: LogicalPlan,
- isExtended: Boolean) extends LogicalPlan with logical.Command {
+ table: TableIdentifier,
+ isExtended: Boolean)
+ extends LogicalPlan with logical.Command {
override def children: Seq[LogicalPlan] = Seq.empty
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index a19dc21638..f44937ec6f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -102,18 +102,8 @@ private[hive] trait HiveStrategies {
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case describe: DescribeCommand =>
- val resolvedTable = context.executePlan(describe.table).analyzed
- resolvedTable match {
- case t: MetastoreRelation =>
- ExecutedCommand(
- DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
-
- case o: LogicalPlan =>
- val resultPlan = context.executePlan(o).executedPlan
- ExecutedCommand(RunnableDescribeCommand(
- resultPlan, describe.output, describe.isExtended)) :: Nil
- }
-
+ ExecutedCommand(
+ DescribeHiveTableCommand(describe.table, describe.output, describe.isExtended)) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index 57293fce97..8481324086 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -22,8 +22,10 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{DescribeCommand, RunnableCommand}
import org.apache.spark.sql.hive.MetastoreRelation
/**
@@ -31,33 +33,44 @@ import org.apache.spark.sql.hive.MetastoreRelation
*/
private[hive]
case class DescribeHiveTableCommand(
- table: MetastoreRelation,
+ tableId: TableIdentifier,
override val output: Seq[Attribute],
isExtended: Boolean) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
- // Trying to mimic the format of Hive's output. But not exactly the same.
- var results: Seq[(String, String, String)] = Nil
-
- val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala
- val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala
- results ++= columns.map(field => (field.getName, field.getType, field.getComment))
- if (partitionColumns.nonEmpty) {
- val partColumnInfo =
- partitionColumns.map(field => (field.getName, field.getType, field.getComment))
- results ++=
- partColumnInfo ++
- Seq(("# Partition Information", "", "")) ++
- Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++
- partColumnInfo
- }
+ // There are two modes here:
+ // For metastore tables, create an output similar to Hive's.
+ // For other tables, delegate to DescribeCommand.
- if (isExtended) {
- results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
- }
+ // In the future, we will consolidate the two and simply report what the catalog reports.
+ sqlContext.sessionState.catalog.lookupRelation(tableId) match {
+ case table: MetastoreRelation =>
+ // Trying to mimic the format of Hive's output. But not exactly the same.
+ var results: Seq[(String, String, String)] = Nil
+
+ val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala
+ val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala
+ results ++= columns.map(field => (field.getName, field.getType, field.getComment))
+ if (partitionColumns.nonEmpty) {
+ val partColumnInfo =
+ partitionColumns.map(field => (field.getName, field.getType, field.getComment))
+ results ++=
+ partColumnInfo ++
+ Seq(("# Partition Information", "", "")) ++
+ Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++
+ partColumnInfo
+ }
+
+ if (isExtended) {
+ results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
+ }
+
+ results.map { case (name, dataType, comment) =>
+ Row(name, dataType, comment)
+ }
- results.map { case (name, dataType, comment) =>
- Row(name, dataType, comment)
+ case o: LogicalPlan =>
+ DescribeCommand(tableId, output, isExtended).run(sqlContext)
}
}
}