aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorOopsOutOfMemory <victorshengli@126.com>2015-02-05 13:07:48 -0800
committerReynold Xin <rxin@databricks.com>2015-02-05 13:07:48 -0800
commit4d8d070c4f9f8211afb95d29036eb5e41796dcf2 (patch)
treec462287cf07b133e29b9ba519a40604fcaf960a1 /sql
parenta83936e109087b5cae8b9734032f2f331fdad2e3 (diff)
downloadspark-4d8d070c4f9f8211afb95d29036eb5e41796dcf2.tar.gz
spark-4d8d070c4f9f8211afb95d29036eb5e41796dcf2.tar.bz2
spark-4d8d070c4f9f8211afb95d29036eb5e41796dcf2.zip
[SPARK-5135][SQL] Add support for describe table to DDL in SQLContext
Hi, rxin marmbrus I considered your suggestion (in #4127) and now re-write it. This is now up-to-date. Could u please review it ? Author: OopsOutOfMemory <victorshengli@126.com> Closes #4227 from OopsOutOfMemory/describe and squashes the following commits: 053826f [OopsOutOfMemory] describe
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala39
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala99
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
10 files changed, 190 insertions, 28 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index a6d6ddd905..be362be55b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -227,8 +227,9 @@ abstract class DataType {
def json: String = compact(render(jsonValue))
def prettyJson: String = pretty(render(jsonValue))
-}
+ def simpleString: String = typeName
+}
/**
* :: DeveloperApi ::
@@ -242,7 +243,6 @@ case object NullType extends DataType {
override def defaultSize: Int = 1
}
-
protected[sql] object NativeType {
val all = Seq(
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
@@ -448,6 +448,8 @@ case object LongType extends IntegralType {
* The default size of a value of the LongType is 8 bytes.
*/
override def defaultSize: Int = 8
+
+ override def simpleString = "bigint"
}
@@ -470,6 +472,8 @@ case object IntegerType extends IntegralType {
* The default size of a value of the IntegerType is 4 bytes.
*/
override def defaultSize: Int = 4
+
+ override def simpleString = "int"
}
@@ -492,6 +496,8 @@ case object ShortType extends IntegralType {
* The default size of a value of the ShortType is 2 bytes.
*/
override def defaultSize: Int = 2
+
+ override def simpleString = "smallint"
}
@@ -514,6 +520,8 @@ case object ByteType extends IntegralType {
* The default size of a value of the ByteType is 1 byte.
*/
override def defaultSize: Int = 1
+
+ override def simpleString = "tinyint"
}
@@ -573,6 +581,11 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT
* The default size of a value of the DecimalType is 4096 bytes.
*/
override def defaultSize: Int = 4096
+
+ override def simpleString = precisionInfo match {
+ case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
+ case None => "decimal(10,0)"
+ }
}
@@ -695,6 +708,8 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
* (We assume that there are 100 elements).
*/
override def defaultSize: Int = 100 * elementType.defaultSize
+
+ override def simpleString = s"array<${elementType.simpleString}>"
}
@@ -870,6 +885,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
* The default size of a value of the StructType is the total default sizes of all field types.
*/
override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
+
+ override def simpleString = {
+ val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
+ s"struct<${fieldTypes.mkString(",")}>"
+ }
}
@@ -920,6 +940,8 @@ case class MapType(
* (We assume that there are 100 elements).
*/
override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
+
+ override def simpleString = s"map<${keyType.simpleString},${valueType.simpleString}>"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0c77d399b2..f06f5fd1fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.types._
+import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.sources._
-
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
@@ -337,6 +338,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")
+ case LogicalDescribeCommand(table, isExtended) =>
+ val resultPlan = self.sqlContext.executePlan(table).executedPlan
+ ExecutedCommand(
+ RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil
+
+ case LogicalDescribeCommand(table, isExtended) =>
+ val resultPlan = self.sqlContext.executePlan(table).executedPlan
+ ExecutedCommand(
+ RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil
+
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 1bc53968c4..335757087d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import scala.collection.mutable.ArrayBuffer
/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
@@ -176,9 +177,14 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
@DeveloperApi
case class DescribeCommand(
child: SparkPlan,
- override val output: Seq[Attribute]) extends RunnableCommand {
+ override val output: Seq[Attribute],
+ isExtended: Boolean) extends RunnableCommand {
override def run(sqlContext: SQLContext) = {
- child.output.map(field => Row(field.name, field.dataType.toString, null))
+ child.schema.fields.map { field =>
+ val cmtKey = "comment"
+ val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
+ Row(field.name, field.dataType.simpleString, comment)
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index ead827728c..2ef740b3be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -23,6 +23,8 @@ import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -50,7 +52,6 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
}
}
-
// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
// properties via reflection the class in runtime for constructing the SqlLexical object
protected val CREATE = Keyword("CREATE")
@@ -61,6 +62,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
+ protected val DESCRIBE = Keyword("DESCRIBE")
+ protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
@@ -82,7 +85,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val MAP = Keyword("MAP")
protected val STRUCT = Keyword("STRUCT")
- protected lazy val ddl: Parser[LogicalPlan] = createTable
+ protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable
protected def start: Parser[LogicalPlan] = ddl
@@ -136,6 +139,22 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
+ /*
+ * describe [extended] table avroTable
+ * This will display all columns of table `avroTable` includes column_name,column_type,nullable
+ */
+ protected lazy val describeTable: Parser[LogicalPlan] =
+ (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
+ case e ~ db ~ tbl =>
+ val tblIdentifier = db match {
+ case Some(dbName) =>
+ Seq(dbName, tbl)
+ case None =>
+ Seq(tbl)
+ }
+ DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
+ }
+
protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
@@ -274,6 +293,22 @@ object ResolvedDataSource {
private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+/**
+ * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ * @param table The table to be described.
+ * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
+ * It is effective only when the table is a Hive table.
+ */
+private[sql] case class DescribeCommand(
+ table: LogicalPlan,
+ isExtended: Boolean) extends Command {
+ override def output = Seq(
+ // Column names are based on Hive.
+ AttributeReference("col_name", StringType, nullable = false)(),
+ AttributeReference("data_type", StringType, nullable = false)(),
+ AttributeReference("comment", StringType, nullable = false)())
+}
+
private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
new file mode 100644
index 0000000000..0ec756bfeb
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -0,0 +1,99 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+class DDLScanSource extends RelationProvider {
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String]): BaseRelation = {
+ SimpleDDLScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext)
+ }
+}
+
+case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
+ extends TableScan {
+
+ override def schema =
+ StructType(Seq(
+ StructField("intType", IntegerType, nullable = false,
+ new MetadataBuilder().putString("comment", "test comment").build()),
+ StructField("stringType", StringType, nullable = false),
+ StructField("dateType", DateType, nullable = false),
+ StructField("timestampType", TimestampType, nullable = false),
+ StructField("doubleType", DoubleType, nullable = false),
+ StructField("bigintType", LongType, nullable = false),
+ StructField("tinyintType", ByteType, nullable = false),
+ StructField("decimalType", DecimalType.Unlimited, nullable = false),
+ StructField("fixedDecimalType", DecimalType(5,1), nullable = false),
+ StructField("binaryType", BinaryType, nullable = false),
+ StructField("booleanType", BooleanType, nullable = false),
+ StructField("smallIntType", ShortType, nullable = false),
+ StructField("floatType", FloatType, nullable = false),
+ StructField("mapType", MapType(StringType, StringType)),
+ StructField("arrayType", ArrayType(StringType)),
+ StructField("structType",
+ StructType(StructField("f1",StringType) ::
+ (StructField("f2",IntegerType)) :: Nil
+ )
+ )
+ ))
+
+
+ override def buildScan() = sqlContext.sparkContext.parallelize(from to to).
+ map(e => Row(s"people$e", e * 2))
+}
+
+class DDLTestSuite extends DataSourceTest {
+ import caseInsensisitiveContext._
+
+ before {
+ sql(
+ """
+ |CREATE TEMPORARY TABLE ddlPeople
+ |USING org.apache.spark.sql.sources.DDLScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+
+ sqlTest(
+ "describe ddlPeople",
+ Seq(
+ Row("intType", "int", "test comment"),
+ Row("stringType", "string", ""),
+ Row("dateType", "date", ""),
+ Row("timestampType", "timestamp", ""),
+ Row("doubleType", "double", ""),
+ Row("bigintType", "bigint", ""),
+ Row("tinyintType", "tinyint", ""),
+ Row("decimalType", "decimal(10,0)", ""),
+ Row("fixedDecimalType", "decimal(5,1)", ""),
+ Row("binaryType", "binary", ""),
+ Row("booleanType", "boolean", ""),
+ Row("smallIntType", "smallint", ""),
+ Row("floatType", "float", ""),
+ Row("mapType", "map<string,string>", ""),
+ Row("arrayType", "array<string>", ""),
+ Row("structType", "struct<f1:string,f2:int>", "")
+ ))
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 1921bf6e5e..d2371d4a55 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -75,7 +75,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
DataFrame(this,
ddlParser(sqlText, exceptionOnError = false).getOrElse(HiveQl.parseSql(substituted)))
} else {
- sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
+ sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 62e9d92eac..c19a091719 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.ExplainCommand
+import org.apache.spark.sql.sources.DescribeCommand
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
import org.apache.spark.sql.types._
@@ -47,22 +48,6 @@ import scala.collection.JavaConversions._
*/
private[hive] case object NativePlaceholder extends Command
-/**
- * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
- * @param table The table to be described.
- * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
- * It is effective only when the table is a Hive table.
- */
-case class DescribeCommand(
- table: LogicalPlan,
- isExtended: Boolean) extends Command {
- override def output = Seq(
- // Column names are based on Hive.
- AttributeReference("col_name", StringType, nullable = false)(),
- AttributeReference("data_type", StringType, nullable = false)(),
- AttributeReference("comment", StringType, nullable = false)())
-}
-
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl {
protected val nativeCommands = Seq(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d89111094b..7857a0252e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.sources.DescribeCommand
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
@@ -240,8 +241,11 @@ private[hive] trait HiveStrategies {
case t: MetastoreRelation =>
ExecutedCommand(
DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
+
case o: LogicalPlan =>
- ExecutedCommand(RunnableDescribeCommand(planLater(o), describe.output)) :: Nil
+ val resultPlan = context.executePlan(o).executedPlan
+ ExecutedCommand(RunnableDescribeCommand(
+ resultPlan, describe.output, describe.isExtended)) :: Nil
}
case _ => Nil
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index f8a957d55d..a90bd1e257 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -22,8 +22,8 @@ import java.io._
import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
import org.apache.spark.Logging
+import org.apache.spark.sql.sources.DescribeCommand
import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
-import org.apache.spark.sql.hive.DescribeCommand
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util._
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 407d6058c3..bb73ff1ea7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -741,8 +741,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assertResult(
Array(
- Row("a", "IntegerType", null),
- Row("b", "StringType", null))
+ Row("a", "int", ""),
+ Row("b", "string", ""))
) {
sql("DESCRIBE test_describe_commands2")
.select('col_name, 'data_type, 'comment)