aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 00:24:24 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 00:24:24 -0700
commit77d847ddb22cc6c5f21f0794d10bdd73b6fac193 (patch)
tree1ce982a430bee2277195629056afab8dbd6928b0
parentcfe472a34ea8bbf2f7a04acbf0c6ab6c48d732ff (diff)
downloadspark-77d847ddb22cc6c5f21f0794d10bdd73b6fac193.tar.gz
spark-77d847ddb22cc6c5f21f0794d10bdd73b6fac193.tar.bz2
spark-77d847ddb22cc6c5f21f0794d10bdd73b6fac193.zip
[SPARK-14792][SQL] Move as many parsing rules as possible into SQL parser
## What changes were proposed in this pull request? This patch moves as many parsing rules as possible into SQL parser. There are only three more left after this patch: (1) run native command, (2) analyze, and (3) script IO. These 3 will be dealt with in a follow-up PR. ## How was this patch tested? No test change. This simply moves code around. Author: Reynold Xin <rxin@databricks.com> Closes #12556 from rxin/SPARK-14792.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala358
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala48
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala80
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala24
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala96
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala341
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala26
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala6
14 files changed, 568 insertions, 489 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f3f84144ad..d85ddd5a98 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -207,22 +207,6 @@ class SQLContext private[sql](
sessionState.addJar(path)
}
- /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */
- @transient protected[sql] lazy val functionResourceLoader: FunctionResourceLoader = {
- new FunctionResourceLoader {
- override def loadResource(resource: FunctionResource): Unit = {
- resource.resourceType match {
- case JarResource => addJar(resource.uri)
- case FileResource => sparkContext.addFile(resource.uri)
- case ArchiveResource =>
- throw new AnalysisException(
- "Archive is not allowed to be loaded. If YARN mode is used, " +
- "please use --archives options while calling spark-submit.")
- }
- }
- }
- }
-
/**
* :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 8ed6ed21d0..ac12a72fc6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -14,29 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.sql.execution
import scala.collection.JavaConverters._
+import org.antlr.v4.runtime.{ParserRuleContext, Token}
+
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
-import org.apache.spark.sql.execution.command.{DescribeCommand => _, _}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _}
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
/**
* Concrete parser for Spark SQL statements.
*/
-object SparkSqlParser extends AbstractSqlParser{
- val astBuilder = new SparkSqlAstBuilder
+class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
+ val astBuilder = new SparkSqlAstBuilder(conf)
}
/**
* Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
*/
-class SparkSqlAstBuilder extends AstBuilder {
+class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
import org.apache.spark.sql.catalyst.parser.ParserUtils._
/**
@@ -520,7 +525,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
- * Create an [[AlterTableExchangePartition]] command.
+ * Create an (Hive's) AlterTableExchangePartition command.
*
* For example:
* {{{
@@ -576,7 +581,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
- * Create an [[AlterTableArchivePartition]] command
+ * Create an (Hive's) AlterTableArchivePartition command
*
* For example:
* {{{
@@ -590,7 +595,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
- * Create an [[AlterTableUnarchivePartition]] command
+ * Create an (Hive's) AlterTableUnarchivePartition command
*
* For example:
* {{{
@@ -648,7 +653,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
- * Create an [[AlterTableTouch]] command
+ * Create an (Hive's) AlterTableTouch command
*
* For example:
* {{{
@@ -660,7 +665,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
- * Create an [[AlterTableCompact]] command
+ * Create an (Hive's) AlterTableCompact command
*
* For example:
* {{{
@@ -672,7 +677,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
- * Create an [[AlterTableMerge]] command
+ * Create an (Hive's) AlterTableMerge command
*
* For example:
* {{{
@@ -789,4 +794,337 @@ class SparkSqlAstBuilder extends AstBuilder {
override def visitConstantList(ctx: ConstantListContext): Seq[String] = withOrigin(ctx) {
ctx.constant.asScala.map(visitStringConstant)
}
+
+ /**
+ * Fail an unsupported Hive native command.
+ */
+ override def visitFailNativeCommand(
+ ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) {
+ val keywords = if (ctx.kws != null) {
+ Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != null).map(_.getText).mkString(" ")
+ } else {
+ // SET ROLE is the exception to the rule, because we handle this before other SET commands.
+ "SET ROLE"
+ }
+ throw new ParseException(s"Unsupported operation: $keywords", ctx)
+ }
+
+ /**
+ * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource.
+ */
+ override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) {
+ ctx.identifier.getText.toLowerCase match {
+ case "file" => AddFile(remainder(ctx.identifier).trim)
+ case "jar" => AddJar(remainder(ctx.identifier).trim)
+ case other => throw new ParseException(s"Unsupported resource type '$other'.", ctx)
+ }
+ }
+
+ /**
+ * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelectLogicalPlan]].
+ *
+ * This is not used to create datasource tables, which is handled through
+ * "CREATE TABLE ... USING ...".
+ *
+ * Note: several features are currently not supported - temporary tables, bucketing,
+ * skewed columns and storage handlers (STORED BY).
+ *
+ * Expected format:
+ * {{{
+ * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+ * [(col1 data_type [COMMENT col_comment], ...)]
+ * [COMMENT table_comment]
+ * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
+ * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
+ * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
+ * [ROW FORMAT row_format]
+ * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
+ * [LOCATION path]
+ * [TBLPROPERTIES (property_name=property_value, ...)]
+ * [AS select_statement];
+ * }}}
+ */
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
+ val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+ // TODO: implement temporary tables
+ if (temp) {
+ throw new ParseException(
+ "CREATE TEMPORARY TABLE is not supported yet. " +
+ "Please use registerTempTable as an alternative.", ctx)
+ }
+ if (ctx.skewSpec != null) {
+ throw new ParseException("Operation not allowed: CREATE TABLE ... SKEWED BY ...", ctx)
+ }
+ if (ctx.bucketSpec != null) {
+ throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx)
+ }
+ val tableType = if (external) {
+ CatalogTableType.EXTERNAL_TABLE
+ } else {
+ CatalogTableType.MANAGED_TABLE
+ }
+ val comment = Option(ctx.STRING).map(string)
+ val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
+ val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+ val properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
+ val selectQuery = Option(ctx.query).map(plan)
+
+ // Note: Hive requires partition columns to be distinct from the schema, so we need
+ // to include the partition columns here explicitly
+ val schema = cols ++ partitionCols
+
+ // Storage format
+ val defaultStorage: CatalogStorageFormat = {
+ val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
+ val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
+ CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
+ .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
+ outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
+ .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+ // Note: Keep this unspecified because we use the presence of the serde to decide
+ // whether to convert a table created by CTAS to a datasource table.
+ serde = None,
+ serdeProperties = Map())
+ }
+ val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
+ .getOrElse(EmptyStorageFormat)
+ val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
+ val location = Option(ctx.locationSpec).map(visitLocationSpec)
+ val storage = CatalogStorageFormat(
+ locationUri = location,
+ inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
+ outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
+ serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
+ serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties)
+
+ // TODO support the sql text - have a proper location for this!
+ val tableDesc = CatalogTable(
+ identifier = name,
+ tableType = tableType,
+ storage = storage,
+ schema = schema,
+ partitionColumnNames = partitionCols.map(_.name),
+ properties = properties,
+ comment = comment)
+
+ selectQuery match {
+ case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+ case None => CreateTable(tableDesc, ifNotExists)
+ }
+ }
+
+ /**
+ * Create a [[CreateTableLike]] command.
+ */
+ override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
+ val targetTable = visitTableIdentifier(ctx.target)
+ val sourceTable = visitTableIdentifier(ctx.source)
+ CreateTableLike(targetTable, sourceTable, ctx.EXISTS != null)
+ }
+
+ /**
+ * Create a [[CatalogStorageFormat]] for creating tables.
+ */
+ override def visitCreateFileFormat(
+ ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
+ (ctx.fileFormat, ctx.storageHandler) match {
+ // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
+ case (c: TableFileFormatContext, null) =>
+ visitTableFileFormat(c)
+ // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
+ case (c: GenericFileFormatContext, null) =>
+ visitGenericFileFormat(c)
+ case (null, storageHandler) =>
+ throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx)
+ case _ =>
+ throw new ParseException("expected either STORED AS or STORED BY, not both", ctx)
+ }
+ }
+
+ /** Empty storage format for default values and copies. */
+ private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty)
+
+ /**
+ * Create a [[CatalogStorageFormat]].
+ */
+ override def visitTableFileFormat(
+ ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
+ EmptyStorageFormat.copy(
+ inputFormat = Option(string(ctx.inFmt)),
+ outputFormat = Option(string(ctx.outFmt)),
+ serde = Option(ctx.serdeCls).map(string)
+ )
+ }
+
+ /**
+ * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]].
+ */
+ override def visitGenericFileFormat(
+ ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
+ val source = ctx.identifier.getText
+ HiveSerDe.sourceToSerDe(source, conf) match {
+ case Some(s) =>
+ EmptyStorageFormat.copy(
+ inputFormat = s.inputFormat,
+ outputFormat = s.outputFormat,
+ serde = s.serde)
+ case None =>
+ throw new ParseException(s"Unrecognized file format in STORED AS clause: $source", ctx)
+ }
+ }
+
+ /**
+ * Create a [[CatalogStorageFormat]] used for creating tables.
+ *
+ * Example format:
+ * {{{
+ * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
+ * }}}
+ *
+ * OR
+ *
+ * {{{
+ * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
+ * [COLLECTION ITEMS TERMINATED BY char]
+ * [MAP KEYS TERMINATED BY char]
+ * [LINES TERMINATED BY char]
+ * [NULL DEFINED AS char]
+ * }}}
+ */
+ private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) {
+ ctx match {
+ case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
+ case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
+ }
+ }
+
+ /**
+ * Create SERDE row format name and properties pair.
+ */
+ override def visitRowFormatSerde(
+ ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
+ import ctx._
+ EmptyStorageFormat.copy(
+ serde = Option(string(name)),
+ serdeProperties = Option(tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))
+ }
+
+ /**
+ * Create a delimited row format properties object.
+ */
+ override def visitRowFormatDelimited(
+ ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
+ // Collect the entries if any.
+ def entry(key: String, value: Token): Seq[(String, String)] = {
+ Option(value).toSeq.map(x => key -> string(x))
+ }
+ // TODO we need proper support for the NULL format.
+ val entries =
+ entry("field.delim", ctx.fieldsTerminatedBy) ++
+ entry("serialization.format", ctx.fieldsTerminatedBy) ++
+ entry("escape.delim", ctx.escapedBy) ++
+ entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++
+ entry("mapkey.delim", ctx.keysTerminatedBy) ++
+ Option(ctx.linesSeparatedBy).toSeq.map { token =>
+ val value = string(token)
+ assert(
+ value == "\n",
+ s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
+ ctx)
+ "line.delim" -> value
+ }
+ EmptyStorageFormat.copy(serdeProperties = entries.toMap)
+ }
+
+ /**
+ * Create or replace a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+ *
+ * For example:
+ * {{{
+ * CREATE VIEW [IF NOT EXISTS] [db_name.]view_name
+ * [(column_name [COMMENT column_comment], ...) ]
+ * [COMMENT view_comment]
+ * [TBLPROPERTIES (property_name = property_value, ...)]
+ * AS SELECT ...;
+ * }}}
+ */
+ override def visitCreateView(ctx: CreateViewContext): LogicalPlan = withOrigin(ctx) {
+ if (ctx.identifierList != null) {
+ throw new ParseException(s"Operation not allowed: partitioned views", ctx)
+ } else {
+ val identifiers = Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala)
+ val schema = identifiers.map { ic =>
+ CatalogColumn(ic.identifier.getText, null, nullable = true, Option(ic.STRING).map(string))
+ }
+ createView(
+ ctx,
+ ctx.tableIdentifier,
+ comment = Option(ctx.STRING).map(string),
+ schema,
+ ctx.query,
+ Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
+ ctx.EXISTS != null,
+ ctx.REPLACE != null
+ )
+ }
+ }
+
+ /**
+ * Alter the query of a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+ */
+ override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
+ createView(
+ ctx,
+ ctx.tableIdentifier,
+ comment = None,
+ Seq.empty,
+ ctx.query,
+ Map.empty,
+ allowExist = false,
+ replace = true)
+ }
+
+ /**
+ * Create a [[CreateViewAsSelectLogicalCommand]] command.
+ */
+ private def createView(
+ ctx: ParserRuleContext,
+ name: TableIdentifierContext,
+ comment: Option[String],
+ schema: Seq[CatalogColumn],
+ query: QueryContext,
+ properties: Map[String, String],
+ allowExist: Boolean,
+ replace: Boolean): LogicalPlan = {
+ val sql = Option(source(query))
+ val tableDesc = CatalogTable(
+ identifier = visitTableIdentifier(name),
+ tableType = CatalogTableType.VIRTUAL_VIEW,
+ schema = schema,
+ storage = EmptyStorageFormat,
+ properties = properties,
+ viewOriginalText = sql,
+ viewText = sql,
+ comment = comment)
+ CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
+ }
+
+ /**
+ * Create a sequence of [[CatalogColumn]]s from a column list
+ */
+ private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
+ ctx.colType.asScala.map { col =>
+ CatalogColumn(
+ col.identifier.getText.toLowerCase,
+ // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
+ // just convert the whole type string to lower case, otherwise the struct field names
+ // will no longer be case sensitive. Instead, we rely on our parser to get the proper
+ // case before passing it to Hive.
+ CatalystSqlParser.parseDataType(col.dataType.getText).simpleString,
+ nullable = true,
+ Option(col.STRING).map(string))
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
new file mode 100644
index 0000000000..e7191e4bfe
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/**
+ * Adds a jar to the current session so it can be used (for UDFs or serdes).
+ */
+case class AddJar(path: String) extends RunnableCommand {
+ override val output: Seq[Attribute] = {
+ val schema = StructType(
+ StructField("result", IntegerType, nullable = false) :: Nil)
+ schema.toAttributes
+ }
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.addJar(path)
+ Seq(Row(0))
+ }
+}
+
+/**
+ * Adds a file to the current session so it can be used.
+ */
+case class AddFile(path: String) extends RunnableCommand {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.sparkContext.addFile(path)
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 0b41985174..9a7c11ac33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -20,6 +20,25 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
+
+
+case class CreateTableAsSelectLogicalPlan(
+ tableDesc: CatalogTable,
+ child: LogicalPlan,
+ allowExisting: Boolean) extends UnaryNode with Command {
+
+ override def output: Seq[Attribute] = Seq.empty[Attribute]
+
+ override lazy val resolved: Boolean =
+ tableDesc.identifier.database.isDefined &&
+ tableDesc.schema.nonEmpty &&
+ tableDesc.storage.serde.isDefined &&
+ tableDesc.storage.inputFormat.isDefined &&
+ tableDesc.storage.outputFormat.isDefined &&
+ childrenResolved
+}
/**
* A command to create a table with the same definition of the given existing table.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
new file mode 100644
index 0000000000..aa6112c7f0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
+
+case class CreateViewAsSelectLogicalCommand(
+ tableDesc: CatalogTable,
+ child: LogicalPlan,
+ allowExisting: Boolean,
+ replace: Boolean,
+ sql: String) extends UnaryNode with Command {
+ override def output: Seq[Attribute] = Seq.empty[Attribute]
+ override lazy val resolved: Boolean = false
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 8e2e94669b..f2448af991 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -1175,7 +1176,10 @@ object functions {
* @group normal_funcs
*/
def expr(expr: String): Column = {
- Column(SparkSqlParser.parseExpression(expr))
+ val parser = SQLContext.getActive().map(_.sessionState.sqlParser).getOrElse {
+ new SparkSqlParser(new SQLConf)
+ }
+ Column(parser.parseExpression(expr))
}
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
new file mode 100644
index 0000000000..38317d46dd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+case class HiveSerDe(
+ inputFormat: Option[String] = None,
+ outputFormat: Option[String] = None,
+ serde: Option[String] = None)
+
+object HiveSerDe {
+ /**
+ * Get the Hive SerDe information from the data source abbreviation string or classname.
+ *
+ * @param source Currently the source abbreviation can be one of the following:
+ * SequenceFile, RCFile, ORC, PARQUET, and case insensitive.
+ * @param conf SQLConf
+ * @return HiveSerDe associated with the specified source
+ */
+ def sourceToSerDe(source: String, conf: SQLConf): Option[HiveSerDe] = {
+ val serdeMap = Map(
+ "sequencefile" ->
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+ outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
+
+ "rcfile" ->
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
+ serde = Option(conf.getConfString("hive.default.rcfile.serde",
+ "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))),
+
+ "orc" ->
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
+ serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
+
+ "parquet" ->
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+ serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
+
+ "textfile" ->
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+
+ "avro" ->
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
+ serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
+
+ val key = source.toLowerCase match {
+ case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
+ case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
+ case s => s
+ }
+
+ serdeMap.get(key)
+ }
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 42915d5887..08a99627bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -22,9 +22,9 @@ import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.spark.internal.config.ConfigEntry
-import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, _}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -56,13 +56,29 @@ private[sql] class SessionState(ctx: SQLContext) {
*/
lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
+ /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */
+ lazy val functionResourceLoader: FunctionResourceLoader = {
+ new FunctionResourceLoader {
+ override def loadResource(resource: FunctionResource): Unit = {
+ resource.resourceType match {
+ case JarResource => addJar(resource.uri)
+ case FileResource => ctx.sparkContext.addFile(resource.uri)
+ case ArchiveResource =>
+ throw new AnalysisException(
+ "Archive is not allowed to be loaded. If YARN mode is used, " +
+ "please use --archives options while calling spark-submit.")
+ }
+ }
+ }
+ }
+
/**
* Internal catalog for managing table and database states.
*/
lazy val catalog =
new SessionCatalog(
ctx.externalCatalog,
- ctx.functionResourceLoader,
+ functionResourceLoader,
functionRegistry,
conf)
@@ -93,7 +109,7 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
*/
- lazy val sqlParser: ParserInterface = SparkSqlParser
+ lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
/**
* Planner that converts optimized logical plans to physical plans.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index d6ccaf9348..70765158b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -23,11 +23,12 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
// TODO: merge this with DDLSuite (SPARK-14441)
class DDLCommandSuite extends PlanTest {
- private val parser = SparkSqlParser
+ private val parser = new SparkSqlParser(new SQLConf)
private def assertUnsupported(sql: String): Unit = {
val e = intercept[AnalysisException] {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c20b022e84..3eea6c06ac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -40,75 +40,15 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand}
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
-private[hive] case class HiveSerDe(
- inputFormat: Option[String] = None,
- outputFormat: Option[String] = None,
- serde: Option[String] = None)
-
-private[hive] object HiveSerDe {
- /**
- * Get the Hive SerDe information from the data source abbreviation string or classname.
- *
- * @param source Currently the source abbreviation can be one of the following:
- * SequenceFile, RCFile, ORC, PARQUET, and case insensitive.
- * @param conf SQLConf
- * @return HiveSerDe associated with the specified source
- */
- def sourceToSerDe(source: String, conf: SQLConf): Option[HiveSerDe] = {
- val serdeMap = Map(
- "sequencefile" ->
- HiveSerDe(
- inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
- outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
-
- "rcfile" ->
- HiveSerDe(
- inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
- serde = Option(conf.getConfString("hive.default.rcfile.serde",
- "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))),
-
- "orc" ->
- HiveSerDe(
- inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
- serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
-
- "parquet" ->
- HiveSerDe(
- inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
- serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
-
- "textfile" ->
- HiveSerDe(
- inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-
- "avro" ->
- HiveSerDe(
- inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
- serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
-
- val key = source.toLowerCase match {
- case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
- case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
- case s => s
- }
-
- serdeMap.get(key)
- }
-}
-
/**
* Legacy catalog for interacting with the Hive metastore.
@@ -699,7 +639,8 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
- case CreateViewAsSelect(table, child, allowExisting, replace, sql) if conf.nativeView =>
+ case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql)
+ if conf.nativeView =>
if (allowExisting && replace) {
throw new AnalysisException(
"It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
@@ -713,10 +654,10 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
allowExisting,
replace)
- case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
+ case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) =>
HiveNativeCommand(sql)
- case p @ CreateTableAsSelect(table, child, allowExisting) =>
+ case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
val schema = if (table.schema.nonEmpty) {
table.schema
} else {
@@ -1081,28 +1022,3 @@ private[hive] object HiveMetastoreTypes {
case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType)
}
}
-
-private[hive] case class CreateTableAsSelect(
- tableDesc: CatalogTable,
- child: LogicalPlan,
- allowExisting: Boolean) extends UnaryNode with Command {
-
- override def output: Seq[Attribute] = Seq.empty[Attribute]
- override lazy val resolved: Boolean =
- tableDesc.identifier.database.isDefined &&
- tableDesc.schema.nonEmpty &&
- tableDesc.storage.serde.isDefined &&
- tableDesc.storage.inputFormat.isDefined &&
- tableDesc.storage.outputFormat.isDefined &&
- childrenResolved
-}
-
-private[hive] case class CreateViewAsSelect(
- tableDesc: CatalogTable,
- child: LogicalPlan,
- allowExisting: Boolean,
- replace: Boolean,
- sql: String) extends UnaryNode with Command {
- override def output: Seq[Attribute] = Seq.empty[Attribute]
- override lazy val resolved: Boolean = false
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 2c360cb7ca..171def43b5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -82,7 +82,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
sharedState.externalCatalog,
metadataHive,
ctx,
- ctx.functionResourceLoader,
+ ctx.sessionState.functionResourceLoader,
functionRegistry,
conf,
hiveconf)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 90f10d5ebd..00f829d850 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.sql.hive.execution
-import scala.collection.JavaConverters._
import scala.util.Try
-import org.antlr.v4.runtime.{ParserRuleContext, Token}
+import org.antlr.v4.runtime.Token
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.hadoop.hive.serde.serdeConstants
@@ -29,8 +29,6 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike}
-import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView, HiveSerDe}
import org.apache.spark.sql.internal.SQLConf
/**
@@ -54,7 +52,8 @@ class HiveSqlParser(conf: SQLConf, hiveconf: HiveConf) extends AbstractSqlParser
/**
* Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
*/
-class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder {
+class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
+
import ParserUtils._
/**
@@ -66,31 +65,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder {
}
/**
- * Fail an unsupported Hive native command.
- */
- override def visitFailNativeCommand(
- ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) {
- val keywords = if (ctx.kws != null) {
- Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != null).map(_.getText).mkString(" ")
- } else {
- // SET ROLE is the exception to the rule, because we handle this before other SET commands.
- "SET ROLE"
- }
- throw new ParseException(s"Unsupported operation: $keywords", ctx)
- }
-
- /**
- * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource.
- */
- override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) {
- ctx.identifier.getText.toLowerCase match {
- case "file" => AddFile(remainder(ctx.identifier).trim)
- case "jar" => AddJar(remainder(ctx.identifier).trim)
- case other => throw new ParseException(s"Unsupported resource type '$other'.", ctx)
- }
- }
-
- /**
* Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
* options are passed on to Hive) e.g.:
* {{{
@@ -108,202 +82,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder {
}
/**
- * Create a [[CatalogStorageFormat]] for creating tables.
- */
- override def visitCreateFileFormat(
- ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
- (ctx.fileFormat, ctx.storageHandler) match {
- // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
- case (c: TableFileFormatContext, null) =>
- visitTableFileFormat(c)
- // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
- case (c: GenericFileFormatContext, null) =>
- visitGenericFileFormat(c)
- case (null, storageHandler) =>
- throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx)
- case _ =>
- throw new ParseException("expected either STORED AS or STORED BY, not both", ctx)
- }
- }
-
- /**
- * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelect]].
- *
- * This is not used to create datasource tables, which is handled through
- * "CREATE TABLE ... USING ...".
- *
- * Note: several features are currently not supported - temporary tables, bucketing,
- * skewed columns and storage handlers (STORED BY).
- *
- * Expected format:
- * {{{
- * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
- * [(col1 data_type [COMMENT col_comment], ...)]
- * [COMMENT table_comment]
- * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
- * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
- * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
- * [ROW FORMAT row_format]
- * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
- * [LOCATION path]
- * [TBLPROPERTIES (property_name=property_value, ...)]
- * [AS select_statement];
- * }}}
- */
- override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
- val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
- // TODO: implement temporary tables
- if (temp) {
- throw new ParseException(
- "CREATE TEMPORARY TABLE is not supported yet. " +
- "Please use registerTempTable as an alternative.", ctx)
- }
- if (ctx.skewSpec != null) {
- throw new ParseException("Operation not allowed: CREATE TABLE ... SKEWED BY ...", ctx)
- }
- if (ctx.bucketSpec != null) {
- throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx)
- }
- val tableType = if (external) {
- CatalogTableType.EXTERNAL_TABLE
- } else {
- CatalogTableType.MANAGED_TABLE
- }
- val comment = Option(ctx.STRING).map(string)
- val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
- val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
- val properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
- val selectQuery = Option(ctx.query).map(plan)
-
- // Note: Hive requires partition columns to be distinct from the schema, so we need
- // to include the partition columns here explicitly
- val schema = cols ++ partitionCols
-
- // Storage format
- val defaultStorage: CatalogStorageFormat = {
- val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
- val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
- CatalogStorageFormat(
- locationUri = None,
- inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
- .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
- outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
- .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
- // Note: Keep this unspecified because we use the presence of the serde to decide
- // whether to convert a table created by CTAS to a datasource table.
- serde = None,
- serdeProperties = Map())
- }
- val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
- .getOrElse(EmptyStorageFormat)
- val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
- val location = Option(ctx.locationSpec).map(visitLocationSpec)
- val storage = CatalogStorageFormat(
- locationUri = location,
- inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
- outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
- serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
- serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties)
-
- // TODO support the sql text - have a proper location for this!
- val tableDesc = CatalogTable(
- identifier = name,
- tableType = tableType,
- storage = storage,
- schema = schema,
- partitionColumnNames = partitionCols.map(_.name),
- properties = properties,
- comment = comment)
-
- selectQuery match {
- case Some(q) => CTAS(tableDesc, q, ifNotExists)
- case None => CreateTable(tableDesc, ifNotExists)
- }
- }
-
- /**
- * Create a [[CreateTableLike]] command.
- */
- override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
- val targetTable = visitTableIdentifier(ctx.target)
- val sourceTable = visitTableIdentifier(ctx.source)
- CreateTableLike(targetTable, sourceTable, ctx.EXISTS != null)
- }
-
- /**
- * Create or replace a view. This creates a [[CreateViewAsSelect]] command.
- *
- * For example:
- * {{{
- * CREATE VIEW [IF NOT EXISTS] [db_name.]view_name
- * [(column_name [COMMENT column_comment], ...) ]
- * [COMMENT view_comment]
- * [TBLPROPERTIES (property_name = property_value, ...)]
- * AS SELECT ...;
- * }}}
- */
- override def visitCreateView(ctx: CreateViewContext): LogicalPlan = withOrigin(ctx) {
- if (ctx.identifierList != null) {
- throw new ParseException(s"Operation not allowed: partitioned views", ctx)
- } else {
- val identifiers = Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala)
- val schema = identifiers.map { ic =>
- CatalogColumn(ic.identifier.getText, null, nullable = true, Option(ic.STRING).map(string))
- }
- createView(
- ctx,
- ctx.tableIdentifier,
- comment = Option(ctx.STRING).map(string),
- schema,
- ctx.query,
- Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
- ctx.EXISTS != null,
- ctx.REPLACE != null
- )
- }
- }
-
- /**
- * Alter the query of a view. This creates a [[CreateViewAsSelect]] command.
- */
- override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
- createView(
- ctx,
- ctx.tableIdentifier,
- comment = None,
- Seq.empty,
- ctx.query,
- Map.empty,
- allowExist = false,
- replace = true)
- }
-
- /**
- * Create a [[CreateViewAsSelect]] command.
- */
- private def createView(
- ctx: ParserRuleContext,
- name: TableIdentifierContext,
- comment: Option[String],
- schema: Seq[CatalogColumn],
- query: QueryContext,
- properties: Map[String, String],
- allowExist: Boolean,
- replace: Boolean): LogicalPlan = {
- val sql = Option(source(query))
- val tableDesc = CatalogTable(
- identifier = visitTableIdentifier(name),
- tableType = CatalogTableType.VIRTUAL_VIEW,
- schema = schema,
- storage = EmptyStorageFormat,
- properties = properties,
- viewOriginalText = sql,
- viewText = sql,
- comment = comment)
- CreateView(tableDesc, plan(query), allowExist, replace, command(ctx))
- }
-
- /**
* Create a [[HiveScriptIOSchema]].
*/
override protected def withScriptIOSchema(
@@ -371,115 +149,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder {
reader, writer,
schemaLess)
}
-
- /** Empty storage format for default values and copies. */
- private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty)
-
- /**
- * Create a [[CatalogStorageFormat]].
- */
- override def visitTableFileFormat(
- ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
- EmptyStorageFormat.copy(
- inputFormat = Option(string(ctx.inFmt)),
- outputFormat = Option(string(ctx.outFmt)),
- serde = Option(ctx.serdeCls).map(string)
- )
- }
-
- /**
- * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]].
- */
- override def visitGenericFileFormat(
- ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
- val source = ctx.identifier.getText
- HiveSerDe.sourceToSerDe(source, conf) match {
- case Some(s) =>
- EmptyStorageFormat.copy(
- inputFormat = s.inputFormat,
- outputFormat = s.outputFormat,
- serde = s.serde)
- case None =>
- throw new ParseException(s"Unrecognized file format in STORED AS clause: $source", ctx)
- }
- }
-
- /**
- * Create a [[RowFormat]] used for creating tables.
- *
- * Example format:
- * {{{
- * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
- * }}}
- *
- * OR
- *
- * {{{
- * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
- * [COLLECTION ITEMS TERMINATED BY char]
- * [MAP KEYS TERMINATED BY char]
- * [LINES TERMINATED BY char]
- * [NULL DEFINED AS char]
- * }}}
- */
- private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) {
- ctx match {
- case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
- case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
- }
- }
-
- /**
- * Create SERDE row format name and properties pair.
- */
- override def visitRowFormatSerde(
- ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
- import ctx._
- EmptyStorageFormat.copy(
- serde = Option(string(name)),
- serdeProperties = Option(tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))
- }
-
- /**
- * Create a delimited row format properties object.
- */
- override def visitRowFormatDelimited(
- ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
- // Collect the entries if any.
- def entry(key: String, value: Token): Seq[(String, String)] = {
- Option(value).toSeq.map(x => key -> string(x))
- }
- // TODO we need proper support for the NULL format.
- val entries = entry(serdeConstants.FIELD_DELIM, ctx.fieldsTerminatedBy) ++
- entry(serdeConstants.SERIALIZATION_FORMAT, ctx.fieldsTerminatedBy) ++
- entry(serdeConstants.ESCAPE_CHAR, ctx.escapedBy) ++
- entry(serdeConstants.COLLECTION_DELIM, ctx.collectionItemsTerminatedBy) ++
- entry(serdeConstants.MAPKEY_DELIM, ctx.keysTerminatedBy) ++
- Option(ctx.linesSeparatedBy).toSeq.map { token =>
- val value = string(token)
- assert(
- value == "\n",
- s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
- ctx)
- serdeConstants.LINE_DELIM -> value
- }
- EmptyStorageFormat.copy(serdeProperties = entries.toMap)
- }
-
- /**
- * Create a sequence of [[CatalogColumn]]s from a column list
- */
- private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
- ctx.colType.asScala.map { col =>
- CatalogColumn(
- col.identifier.getText.toLowerCase,
- // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
- // just convert the whole type string to lower case, otherwise the struct field names
- // will no longer be case sensitive. Instead, we rely on our parser to get the proper
- // case before passing it to Hive.
- CatalystSqlParser.parseDataType(col.dataType.getText).simpleString,
- nullable = true,
- Option(col.STRING).map(string))
- }
- }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index b5ee9a6295..78f8bfe59f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -124,32 +124,6 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
}
private[hive]
-case class AddJar(path: String) extends RunnableCommand {
-
- override val output: Seq[Attribute] = {
- val schema = StructType(
- StructField("result", IntegerType, false) :: Nil)
- schema.toAttributes
- }
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.addJar(path)
-
- Seq(Row(0))
- }
-}
-
-private[hive]
-case class AddFile(path: String) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.runNativeSql(s"ADD FILE $path")
- sqlContext.sparkContext.addFile(path)
- Seq.empty[Row]
- }
-}
-
-private[hive]
case class CreateMetastoreDataSource(
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 484cf528e6..4c90dbeb1b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike}
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand}
import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.hive.test.TestHive
@@ -39,8 +39,8 @@ class HiveDDLCommandSuite extends PlanTest {
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
case CreateTable(desc, allowExisting) => (desc, allowExisting)
- case CreateTableAsSelect(desc, _, allowExisting) => (desc, allowExisting)
- case CreateViewAsSelect(desc, _, allowExisting, _, _) => (desc, allowExisting)
+ case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
+ case CreateViewAsSelectLogicalCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
}.head
}