aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorOopsOutOfMemory <victorshengli@126.com>2015-02-05 13:07:48 -0800
committerReynold Xin <rxin@databricks.com>2015-02-05 13:07:48 -0800
commit4d8d070c4f9f8211afb95d29036eb5e41796dcf2 (patch)
treec462287cf07b133e29b9ba519a40604fcaf960a1 /sql/core
parenta83936e109087b5cae8b9734032f2f331fdad2e3 (diff)
downloadspark-4d8d070c4f9f8211afb95d29036eb5e41796dcf2.tar.gz
spark-4d8d070c4f9f8211afb95d29036eb5e41796dcf2.tar.bz2
spark-4d8d070c4f9f8211afb95d29036eb5e41796dcf2.zip
[SPARK-5135][SQL] Add support for describe table to DDL in SQLContext
Hi, rxin marmbrus I considered your suggestion (in #4127) and now re-write it. This is now up-to-date. Could u please review it ? Author: OopsOutOfMemory <victorshengli@126.com> Closes #4227 from OopsOutOfMemory/describe and squashes the following commits: 053826f [OopsOutOfMemory] describe
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala39
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala99
4 files changed, 156 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0c77d399b2..f06f5fd1fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.types._
+import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.sources._
-
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
@@ -337,6 +338,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")
+ case LogicalDescribeCommand(table, isExtended) =>
+ val resultPlan = self.sqlContext.executePlan(table).executedPlan
+ ExecutedCommand(
+ RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil
+
+ case LogicalDescribeCommand(table, isExtended) =>
+ val resultPlan = self.sqlContext.executePlan(table).executedPlan
+ ExecutedCommand(
+ RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil
+
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 1bc53968c4..335757087d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import scala.collection.mutable.ArrayBuffer
/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
@@ -176,9 +177,14 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
@DeveloperApi
case class DescribeCommand(
child: SparkPlan,
- override val output: Seq[Attribute]) extends RunnableCommand {
+ override val output: Seq[Attribute],
+ isExtended: Boolean) extends RunnableCommand {
override def run(sqlContext: SQLContext) = {
- child.output.map(field => Row(field.name, field.dataType.toString, null))
+ child.schema.fields.map { field =>
+ val cmtKey = "comment"
+ val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
+ Row(field.name, field.dataType.simpleString, comment)
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index ead827728c..2ef740b3be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -23,6 +23,8 @@ import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -50,7 +52,6 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
}
}
-
// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
// properties via reflection the class in runtime for constructing the SqlLexical object
protected val CREATE = Keyword("CREATE")
@@ -61,6 +62,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
+ protected val DESCRIBE = Keyword("DESCRIBE")
+ protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
@@ -82,7 +85,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val MAP = Keyword("MAP")
protected val STRUCT = Keyword("STRUCT")
- protected lazy val ddl: Parser[LogicalPlan] = createTable
+ protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable
protected def start: Parser[LogicalPlan] = ddl
@@ -136,6 +139,22 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
+ /*
+ * describe [extended] table avroTable
+ * This will display all columns of table `avroTable` includes column_name,column_type,nullable
+ */
+ protected lazy val describeTable: Parser[LogicalPlan] =
+ (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
+ case e ~ db ~ tbl =>
+ val tblIdentifier = db match {
+ case Some(dbName) =>
+ Seq(dbName, tbl)
+ case None =>
+ Seq(tbl)
+ }
+ DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
+ }
+
protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
@@ -274,6 +293,22 @@ object ResolvedDataSource {
private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+/**
+ * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ * @param table The table to be described.
+ * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
+ * It is effective only when the table is a Hive table.
+ */
+private[sql] case class DescribeCommand(
+ table: LogicalPlan,
+ isExtended: Boolean) extends Command {
+ override def output = Seq(
+ // Column names are based on Hive.
+ AttributeReference("col_name", StringType, nullable = false)(),
+ AttributeReference("data_type", StringType, nullable = false)(),
+ AttributeReference("comment", StringType, nullable = false)())
+}
+
private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
new file mode 100644
index 0000000000..0ec756bfeb
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -0,0 +1,99 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+class DDLScanSource extends RelationProvider {
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String]): BaseRelation = {
+ SimpleDDLScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext)
+ }
+}
+
+case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
+ extends TableScan {
+
+ override def schema =
+ StructType(Seq(
+ StructField("intType", IntegerType, nullable = false,
+ new MetadataBuilder().putString("comment", "test comment").build()),
+ StructField("stringType", StringType, nullable = false),
+ StructField("dateType", DateType, nullable = false),
+ StructField("timestampType", TimestampType, nullable = false),
+ StructField("doubleType", DoubleType, nullable = false),
+ StructField("bigintType", LongType, nullable = false),
+ StructField("tinyintType", ByteType, nullable = false),
+ StructField("decimalType", DecimalType.Unlimited, nullable = false),
+ StructField("fixedDecimalType", DecimalType(5,1), nullable = false),
+ StructField("binaryType", BinaryType, nullable = false),
+ StructField("booleanType", BooleanType, nullable = false),
+ StructField("smallIntType", ShortType, nullable = false),
+ StructField("floatType", FloatType, nullable = false),
+ StructField("mapType", MapType(StringType, StringType)),
+ StructField("arrayType", ArrayType(StringType)),
+ StructField("structType",
+ StructType(StructField("f1",StringType) ::
+ (StructField("f2",IntegerType)) :: Nil
+ )
+ )
+ ))
+
+
+ override def buildScan() = sqlContext.sparkContext.parallelize(from to to).
+ map(e => Row(s"people$e", e * 2))
+}
+
+class DDLTestSuite extends DataSourceTest {
+ import caseInsensisitiveContext._
+
+ before {
+ sql(
+ """
+ |CREATE TEMPORARY TABLE ddlPeople
+ |USING org.apache.spark.sql.sources.DDLScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+
+ sqlTest(
+ "describe ddlPeople",
+ Seq(
+ Row("intType", "int", "test comment"),
+ Row("stringType", "string", ""),
+ Row("dateType", "date", ""),
+ Row("timestampType", "timestamp", ""),
+ Row("doubleType", "double", ""),
+ Row("bigintType", "bigint", ""),
+ Row("tinyintType", "tinyint", ""),
+ Row("decimalType", "decimal(10,0)", ""),
+ Row("fixedDecimalType", "decimal(5,1)", ""),
+ Row("binaryType", "binary", ""),
+ Row("booleanType", "boolean", ""),
+ Row("smallIntType", "smallint", ""),
+ Row("floatType", "float", ""),
+ Row("mapType", "map<string,string>", ""),
+ Row("arrayType", "array<string>", ""),
+ Row("structType", "struct<f1:string,f2:int>", "")
+ ))
+}