From 77d847ddb22cc6c5f21f0794d10bdd73b6fac193 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 21 Apr 2016 00:24:24 -0700 Subject: [SPARK-14792][SQL] Move as many parsing rules as possible into SQL parser ## What changes were proposed in this pull request? This patch moves as many parsing rules as possible into SQL parser. There are only three more left after this patch: (1) run native command, (2) analyze, and (3) script IO. These 3 will be dealt with in a follow-up PR. ## How was this patch tested? No test change. This simply moves code around. Author: Reynold Xin Closes #12556 from rxin/SPARK-14792. --- .../scala/org/apache/spark/sql/SQLContext.scala | 16 - .../spark/sql/execution/SparkSqlParser.scala | 358 ++++++++++++++++++++- .../spark/sql/execution/command/resources.scala | 48 +++ .../spark/sql/execution/command/tables.scala | 19 ++ .../apache/spark/sql/execution/command/views.scala | 32 ++ .../scala/org/apache/spark/sql/functions.scala | 6 +- .../org/apache/spark/sql/internal/HiveSerDe.scala | 80 +++++ .../apache/spark/sql/internal/SessionState.scala | 24 +- 8 files changed, 552 insertions(+), 31 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala (limited to 'sql/core/src/main/scala/org/apache') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f3f84144ad..d85ddd5a98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -207,22 +207,6 @@ class SQLContext private[sql]( sessionState.addJar(path) } - /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */ - @transient protected[sql] lazy val functionResourceLoader: FunctionResourceLoader = { - new FunctionResourceLoader { - override def loadResource(resource: FunctionResource): Unit = { - resource.resourceType match { - case JarResource => addJar(resource.uri) - case FileResource => sparkContext.addFile(resource.uri) - case ArchiveResource => - throw new AnalysisException( - "Archive is not allowed to be loaded. If YARN mode is used, " + - "please use --archives options while calling spark-submit.") - } - } - } - } - /** * :: Experimental :: * A collection of methods that are considered experimental, but can be used to hook into diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8ed6ed21d0..ac12a72fc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -14,29 +14,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.execution import scala.collection.JavaConverters._ +import org.antlr.v4.runtime.{ParserRuleContext, Token} + import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} -import org.apache.spark.sql.execution.command.{DescribeCommand => _, _} +import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _} import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} /** * Concrete parser for Spark SQL statements. */ -object SparkSqlParser extends AbstractSqlParser{ - val astBuilder = new SparkSqlAstBuilder +class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{ + val astBuilder = new SparkSqlAstBuilder(conf) } /** * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier. */ -class SparkSqlAstBuilder extends AstBuilder { +class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { import org.apache.spark.sql.catalyst.parser.ParserUtils._ /** @@ -520,7 +525,7 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create an [[AlterTableExchangePartition]] command. + * Create an (Hive's) AlterTableExchangePartition command. * * For example: * {{{ @@ -576,7 +581,7 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create an [[AlterTableArchivePartition]] command + * Create an (Hive's) AlterTableArchivePartition command * * For example: * {{{ @@ -590,7 +595,7 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create an [[AlterTableUnarchivePartition]] command + * Create an (Hive's) AlterTableUnarchivePartition command * * For example: * {{{ @@ -648,7 +653,7 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create an [[AlterTableTouch]] command + * Create an (Hive's) AlterTableTouch command * * For example: * {{{ @@ -660,7 +665,7 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create an [[AlterTableCompact]] command + * Create an (Hive's) AlterTableCompact command * * For example: * {{{ @@ -672,7 +677,7 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create an [[AlterTableMerge]] command + * Create an (Hive's) AlterTableMerge command * * For example: * {{{ @@ -789,4 +794,337 @@ class SparkSqlAstBuilder extends AstBuilder { override def visitConstantList(ctx: ConstantListContext): Seq[String] = withOrigin(ctx) { ctx.constant.asScala.map(visitStringConstant) } + + /** + * Fail an unsupported Hive native command. + */ + override def visitFailNativeCommand( + ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) { + val keywords = if (ctx.kws != null) { + Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != null).map(_.getText).mkString(" ") + } else { + // SET ROLE is the exception to the rule, because we handle this before other SET commands. + "SET ROLE" + } + throw new ParseException(s"Unsupported operation: $keywords", ctx) + } + + /** + * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource. + */ + override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) { + ctx.identifier.getText.toLowerCase match { + case "file" => AddFile(remainder(ctx.identifier).trim) + case "jar" => AddJar(remainder(ctx.identifier).trim) + case other => throw new ParseException(s"Unsupported resource type '$other'.", ctx) + } + } + + /** + * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelectLogicalPlan]]. + * + * This is not used to create datasource tables, which is handled through + * "CREATE TABLE ... USING ...". + * + * Note: several features are currently not supported - temporary tables, bucketing, + * skewed columns and storage handlers (STORED BY). + * + * Expected format: + * {{{ + * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name + * [(col1 data_type [COMMENT col_comment], ...)] + * [COMMENT table_comment] + * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)] + * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS] + * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]] + * [ROW FORMAT row_format] + * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]] + * [LOCATION path] + * [TBLPROPERTIES (property_name=property_value, ...)] + * [AS select_statement]; + * }}} + */ + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { + val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + // TODO: implement temporary tables + if (temp) { + throw new ParseException( + "CREATE TEMPORARY TABLE is not supported yet. " + + "Please use registerTempTable as an alternative.", ctx) + } + if (ctx.skewSpec != null) { + throw new ParseException("Operation not allowed: CREATE TABLE ... SKEWED BY ...", ctx) + } + if (ctx.bucketSpec != null) { + throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx) + } + val tableType = if (external) { + CatalogTableType.EXTERNAL_TABLE + } else { + CatalogTableType.MANAGED_TABLE + } + val comment = Option(ctx.STRING).map(string) + val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) + val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns) + val properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty) + val selectQuery = Option(ctx.query).map(plan) + + // Note: Hive requires partition columns to be distinct from the schema, so we need + // to include the partition columns here explicitly + val schema = cols ++ partitionCols + + // Storage format + val defaultStorage: CatalogStorageFormat = { + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) + CatalogStorageFormat( + locationUri = None, + inputFormat = defaultHiveSerde.flatMap(_.inputFormat) + .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), + outputFormat = defaultHiveSerde.flatMap(_.outputFormat) + .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + // Note: Keep this unspecified because we use the presence of the serde to decide + // whether to convert a table created by CTAS to a datasource table. + serde = None, + serdeProperties = Map()) + } + val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) + .getOrElse(EmptyStorageFormat) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) + val location = Option(ctx.locationSpec).map(visitLocationSpec) + val storage = CatalogStorageFormat( + locationUri = location, + inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), + outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), + serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) + + // TODO support the sql text - have a proper location for this! + val tableDesc = CatalogTable( + identifier = name, + tableType = tableType, + storage = storage, + schema = schema, + partitionColumnNames = partitionCols.map(_.name), + properties = properties, + comment = comment) + + selectQuery match { + case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists) + case None => CreateTable(tableDesc, ifNotExists) + } + } + + /** + * Create a [[CreateTableLike]] command. + */ + override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { + val targetTable = visitTableIdentifier(ctx.target) + val sourceTable = visitTableIdentifier(ctx.source) + CreateTableLike(targetTable, sourceTable, ctx.EXISTS != null) + } + + /** + * Create a [[CatalogStorageFormat]] for creating tables. + */ + override def visitCreateFileFormat( + ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + (ctx.fileFormat, ctx.storageHandler) match { + // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format + case (c: TableFileFormatContext, null) => + visitTableFileFormat(c) + // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO + case (c: GenericFileFormatContext, null) => + visitGenericFileFormat(c) + case (null, storageHandler) => + throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx) + case _ => + throw new ParseException("expected either STORED AS or STORED BY, not both", ctx) + } + } + + /** Empty storage format for default values and copies. */ + private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty) + + /** + * Create a [[CatalogStorageFormat]]. + */ + override def visitTableFileFormat( + ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + EmptyStorageFormat.copy( + inputFormat = Option(string(ctx.inFmt)), + outputFormat = Option(string(ctx.outFmt)), + serde = Option(ctx.serdeCls).map(string) + ) + } + + /** + * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]]. + */ + override def visitGenericFileFormat( + ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + val source = ctx.identifier.getText + HiveSerDe.sourceToSerDe(source, conf) match { + case Some(s) => + EmptyStorageFormat.copy( + inputFormat = s.inputFormat, + outputFormat = s.outputFormat, + serde = s.serde) + case None => + throw new ParseException(s"Unrecognized file format in STORED AS clause: $source", ctx) + } + } + + /** + * Create a [[CatalogStorageFormat]] used for creating tables. + * + * Example format: + * {{{ + * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)] + * }}} + * + * OR + * + * {{{ + * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] + * [COLLECTION ITEMS TERMINATED BY char] + * [MAP KEYS TERMINATED BY char] + * [LINES TERMINATED BY char] + * [NULL DEFINED AS char] + * }}} + */ + private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) { + ctx match { + case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) + case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) + } + } + + /** + * Create SERDE row format name and properties pair. + */ + override def visitRowFormatSerde( + ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { + import ctx._ + EmptyStorageFormat.copy( + serde = Option(string(name)), + serdeProperties = Option(tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)) + } + + /** + * Create a delimited row format properties object. + */ + override def visitRowFormatDelimited( + ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) { + // Collect the entries if any. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).toSeq.map(x => key -> string(x)) + } + // TODO we need proper support for the NULL format. + val entries = + entry("field.delim", ctx.fieldsTerminatedBy) ++ + entry("serialization.format", ctx.fieldsTerminatedBy) ++ + entry("escape.delim", ctx.escapedBy) ++ + entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ + entry("mapkey.delim", ctx.keysTerminatedBy) ++ + Option(ctx.linesSeparatedBy).toSeq.map { token => + val value = string(token) + assert( + value == "\n", + s"LINES TERMINATED BY only supports newline '\\n' right now: $value", + ctx) + "line.delim" -> value + } + EmptyStorageFormat.copy(serdeProperties = entries.toMap) + } + + /** + * Create or replace a view. This creates a [[CreateViewAsSelectLogicalCommand]] command. + * + * For example: + * {{{ + * CREATE VIEW [IF NOT EXISTS] [db_name.]view_name + * [(column_name [COMMENT column_comment], ...) ] + * [COMMENT view_comment] + * [TBLPROPERTIES (property_name = property_value, ...)] + * AS SELECT ...; + * }}} + */ + override def visitCreateView(ctx: CreateViewContext): LogicalPlan = withOrigin(ctx) { + if (ctx.identifierList != null) { + throw new ParseException(s"Operation not allowed: partitioned views", ctx) + } else { + val identifiers = Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala) + val schema = identifiers.map { ic => + CatalogColumn(ic.identifier.getText, null, nullable = true, Option(ic.STRING).map(string)) + } + createView( + ctx, + ctx.tableIdentifier, + comment = Option(ctx.STRING).map(string), + schema, + ctx.query, + Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty), + ctx.EXISTS != null, + ctx.REPLACE != null + ) + } + } + + /** + * Alter the query of a view. This creates a [[CreateViewAsSelectLogicalCommand]] command. + */ + override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) { + createView( + ctx, + ctx.tableIdentifier, + comment = None, + Seq.empty, + ctx.query, + Map.empty, + allowExist = false, + replace = true) + } + + /** + * Create a [[CreateViewAsSelectLogicalCommand]] command. + */ + private def createView( + ctx: ParserRuleContext, + name: TableIdentifierContext, + comment: Option[String], + schema: Seq[CatalogColumn], + query: QueryContext, + properties: Map[String, String], + allowExist: Boolean, + replace: Boolean): LogicalPlan = { + val sql = Option(source(query)) + val tableDesc = CatalogTable( + identifier = visitTableIdentifier(name), + tableType = CatalogTableType.VIRTUAL_VIEW, + schema = schema, + storage = EmptyStorageFormat, + properties = properties, + viewOriginalText = sql, + viewText = sql, + comment = comment) + CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx)) + } + + /** + * Create a sequence of [[CatalogColumn]]s from a column list + */ + private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) { + ctx.colType.asScala.map { col => + CatalogColumn( + col.identifier.getText.toLowerCase, + // Note: for types like "STRUCT" we can't + // just convert the whole type string to lower case, otherwise the struct field names + // will no longer be case sensitive. Instead, we rely on our parser to get the proper + // case before passing it to Hive. + CatalystSqlParser.parseDataType(col.dataType.getText).simpleString, + nullable = true, + Option(col.STRING).map(string)) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala new file mode 100644 index 0000000000..e7191e4bfe --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +/** + * Adds a jar to the current session so it can be used (for UDFs or serdes). + */ +case class AddJar(path: String) extends RunnableCommand { + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("result", IntegerType, nullable = false) :: Nil) + schema.toAttributes + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.addJar(path) + Seq(Row(0)) + } +} + +/** + * Adds a file to the current session so it can be used. + */ +case class AddFile(path: String) extends RunnableCommand { + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sparkContext.addFile(path) + Seq.empty[Row] + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 0b41985174..9a7c11ac33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -20,6 +20,25 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} + + +case class CreateTableAsSelectLogicalPlan( + tableDesc: CatalogTable, + child: LogicalPlan, + allowExisting: Boolean) extends UnaryNode with Command { + + override def output: Seq[Attribute] = Seq.empty[Attribute] + + override lazy val resolved: Boolean = + tableDesc.identifier.database.isDefined && + tableDesc.schema.nonEmpty && + tableDesc.storage.serde.isDefined && + tableDesc.storage.inputFormat.isDefined && + tableDesc.storage.outputFormat.isDefined && + childrenResolved +} /** * A command to create a table with the same definition of the given existing table. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala new file mode 100644 index 0000000000..aa6112c7f0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} + +case class CreateViewAsSelectLogicalCommand( + tableDesc: CatalogTable, + child: LogicalPlan, + allowExisting: Boolean, + replace: Boolean, + sql: String) extends UnaryNode with Command { + override def output: Seq[Attribute] = Seq.empty[Attribute] + override lazy val resolved: Boolean = false +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 8e2e94669b..f2448af991 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -1175,7 +1176,10 @@ object functions { * @group normal_funcs */ def expr(expr: String): Column = { - Column(SparkSqlParser.parseExpression(expr)) + val parser = SQLContext.getActive().map(_.sessionState.sqlParser).getOrElse { + new SparkSqlParser(new SQLConf) + } + Column(parser.parseExpression(expr)) } ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala new file mode 100644 index 0000000000..38317d46dd --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +case class HiveSerDe( + inputFormat: Option[String] = None, + outputFormat: Option[String] = None, + serde: Option[String] = None) + +object HiveSerDe { + /** + * Get the Hive SerDe information from the data source abbreviation string or classname. + * + * @param source Currently the source abbreviation can be one of the following: + * SequenceFile, RCFile, ORC, PARQUET, and case insensitive. + * @param conf SQLConf + * @return HiveSerDe associated with the specified source + */ + def sourceToSerDe(source: String, conf: SQLConf): Option[HiveSerDe] = { + val serdeMap = Map( + "sequencefile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), + outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")), + + "rcfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), + serde = Option(conf.getConfString("hive.default.rcfile.serde", + "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))), + + "orc" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")), + + "parquet" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")), + + "textfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + + "avro" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"), + serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))) + + val key = source.toLowerCase match { + case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" + case s if s.startsWith("org.apache.spark.sql.orc") => "orc" + case s => s + } + + serdeMap.get(key) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 42915d5887..08a99627bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -22,9 +22,9 @@ import java.util.Properties import scala.collection.JavaConverters._ import org.apache.spark.internal.config.ConfigEntry -import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, _} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -56,13 +56,29 @@ private[sql] class SessionState(ctx: SQLContext) { */ lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() + /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */ + lazy val functionResourceLoader: FunctionResourceLoader = { + new FunctionResourceLoader { + override def loadResource(resource: FunctionResource): Unit = { + resource.resourceType match { + case JarResource => addJar(resource.uri) + case FileResource => ctx.sparkContext.addFile(resource.uri) + case ArchiveResource => + throw new AnalysisException( + "Archive is not allowed to be loaded. If YARN mode is used, " + + "please use --archives options while calling spark-submit.") + } + } + } + } + /** * Internal catalog for managing table and database states. */ lazy val catalog = new SessionCatalog( ctx.externalCatalog, - ctx.functionResourceLoader, + functionResourceLoader, functionRegistry, conf) @@ -93,7 +109,7 @@ private[sql] class SessionState(ctx: SQLContext) { /** * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. */ - lazy val sqlParser: ParserInterface = SparkSqlParser + lazy val sqlParser: ParserInterface = new SparkSqlParser(conf) /** * Planner that converts optimized logical plans to physical plans. -- cgit v1.2.3