diff options
author | OopsOutOfMemory <victorshengli@126.com> | 2015-02-05 13:07:48 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-02-05 13:08:16 -0800 |
commit | 55cebcf5b3914c177c92ecfb11b8ebf7c405d74f (patch) | |
tree | c462287cf07b133e29b9ba519a40604fcaf960a1 /sql/core | |
parent | 785a2e3de147ef094931753d30fb49fbdb697f68 (diff) | |
download | spark-55cebcf5b3914c177c92ecfb11b8ebf7c405d74f.tar.gz spark-55cebcf5b3914c177c92ecfb11b8ebf7c405d74f.tar.bz2 spark-55cebcf5b3914c177c92ecfb11b8ebf7c405d74f.zip |
[SPARK-5135][SQL] Add support for describe table to DDL in SQLContext
Hi, rxin marmbrus
I considered your suggestion (in #4127) and now re-write it. This is now up-to-date.
Could u please review it ?
Author: OopsOutOfMemory <victorshengli@126.com>
Closes #4227 from OopsOutOfMemory/describe and squashes the following commits:
053826f [OopsOutOfMemory] describe
(cherry picked from commit 4d8d070c4f9f8211afb95d29036eb5e41796dcf2)
Signed-off-by: Reynold Xin <rxin@databricks.com>
Diffstat (limited to 'sql/core')
4 files changed, 156 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0c77d399b2..f06f5fd1fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.parquet._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand} +import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} import org.apache.spark.sql.sources._ - private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => @@ -337,6 +338,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting => sys.error("allowExisting should be set to false when creating a temporary table.") + case LogicalDescribeCommand(table, isExtended) => + val resultPlan = self.sqlContext.executePlan(table).executedPlan + ExecutedCommand( + RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil + + case LogicalDescribeCommand(table, isExtended) => + val resultPlan = self.sqlContext.executePlan(table).executedPlan + ExecutedCommand( + RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 1bc53968c4..335757087d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import scala.collection.mutable.ArrayBuffer /** * A logical command that is executed for its side-effects. `RunnableCommand`s are @@ -176,9 +177,14 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand { @DeveloperApi case class DescribeCommand( child: SparkPlan, - override val output: Seq[Attribute]) extends RunnableCommand { + override val output: Seq[Attribute], + isExtended: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - child.output.map(field => Row(field.name, field.dataType.toString, null)) + child.schema.fields.map { field => + val cmtKey = "comment" + val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" + Row(field.name, field.dataType.simpleString, comment) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index ead827728c..2ef740b3be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -23,6 +23,8 @@ import org.apache.spark.Logging import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.AbstractSparkSQLParser +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -50,7 +52,6 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { } } - // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword` // properties via reflection the class in runtime for constructing the SqlLexical object protected val CREATE = Keyword("CREATE") @@ -61,6 +62,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val EXISTS = Keyword("EXISTS") protected val USING = Keyword("USING") protected val OPTIONS = Keyword("OPTIONS") + protected val DESCRIBE = Keyword("DESCRIBE") + protected val EXTENDED = Keyword("EXTENDED") protected val AS = Keyword("AS") protected val COMMENT = Keyword("COMMENT") @@ -82,7 +85,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val MAP = Keyword("MAP") protected val STRUCT = Keyword("STRUCT") - protected lazy val ddl: Parser[LogicalPlan] = createTable + protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable protected def start: Parser[LogicalPlan] = ddl @@ -136,6 +139,22 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" + /* + * describe [extended] table avroTable + * This will display all columns of table `avroTable` includes column_name,column_type,nullable + */ + protected lazy val describeTable: Parser[LogicalPlan] = + (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ { + case e ~ db ~ tbl => + val tblIdentifier = db match { + case Some(dbName) => + Seq(dbName, tbl) + case None => + Seq(tbl) + } + DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined) + } + protected lazy val options: Parser[Map[String, String]] = "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } @@ -274,6 +293,22 @@ object ResolvedDataSource { private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) +/** + * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command. + * @param table The table to be described. + * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. + * It is effective only when the table is a Hive table. + */ +private[sql] case class DescribeCommand( + table: LogicalPlan, + isExtended: Boolean) extends Command { + override def output = Seq( + // Column names are based on Hive. + AttributeReference("col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)(), + AttributeReference("comment", StringType, nullable = false)()) +} + private[sql] case class CreateTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala new file mode 100644 index 0000000000..0ec756bfeb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala @@ -0,0 +1,99 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +class DDLScanSource extends RelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + SimpleDDLScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext) + } +} + +case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) + extends TableScan { + + override def schema = + StructType(Seq( + StructField("intType", IntegerType, nullable = false, + new MetadataBuilder().putString("comment", "test comment").build()), + StructField("stringType", StringType, nullable = false), + StructField("dateType", DateType, nullable = false), + StructField("timestampType", TimestampType, nullable = false), + StructField("doubleType", DoubleType, nullable = false), + StructField("bigintType", LongType, nullable = false), + StructField("tinyintType", ByteType, nullable = false), + StructField("decimalType", DecimalType.Unlimited, nullable = false), + StructField("fixedDecimalType", DecimalType(5,1), nullable = false), + StructField("binaryType", BinaryType, nullable = false), + StructField("booleanType", BooleanType, nullable = false), + StructField("smallIntType", ShortType, nullable = false), + StructField("floatType", FloatType, nullable = false), + StructField("mapType", MapType(StringType, StringType)), + StructField("arrayType", ArrayType(StringType)), + StructField("structType", + StructType(StructField("f1",StringType) :: + (StructField("f2",IntegerType)) :: Nil + ) + ) + )) + + + override def buildScan() = sqlContext.sparkContext.parallelize(from to to). + map(e => Row(s"people$e", e * 2)) +} + +class DDLTestSuite extends DataSourceTest { + import caseInsensisitiveContext._ + + before { + sql( + """ + |CREATE TEMPORARY TABLE ddlPeople + |USING org.apache.spark.sql.sources.DDLScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + + sqlTest( + "describe ddlPeople", + Seq( + Row("intType", "int", "test comment"), + Row("stringType", "string", ""), + Row("dateType", "date", ""), + Row("timestampType", "timestamp", ""), + Row("doubleType", "double", ""), + Row("bigintType", "bigint", ""), + Row("tinyintType", "tinyint", ""), + Row("decimalType", "decimal(10,0)", ""), + Row("fixedDecimalType", "decimal(5,1)", ""), + Row("binaryType", "binary", ""), + Row("booleanType", "boolean", ""), + Row("smallIntType", "smallint", ""), + Row("floatType", "float", ""), + Row("mapType", "map<string,string>", ""), + Row("arrayType", "array<string>", ""), + Row("structType", "struct<f1:string,f2:int>", "") + )) +} |