diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-21 15:59:37 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-21 15:59:37 -0700 |
commit | 1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd (patch) | |
tree | 035e8e5dadc2488955b2224cc37a539bd792ab02 /sql/core/src/main/scala/org/apache | |
parent | ef6be7bedd9918c3398d510bb1eb1584908581c8 (diff) | |
download | spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.gz spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.bz2 spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.zip |
[SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder
## What changes were proposed in this pull request?
This patch moves native command and script transformation into SparkSqlAstBuilder. This builds on #12561. See the last commit for diff.
## How was this patch tested?
Updated test cases to reflect this.
Author: Reynold Xin <rxin@databricks.com>
Closes #12564 from rxin/SPARK-14798.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
3 files changed, 128 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ac12a72fc6..05fb1ef631 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import scala.collection.JavaConverters._ +import scala.util.Try import org.antlr.v4.runtime.{ParserRuleContext, Token} @@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} + /** * Concrete parser for Spark SQL statements. */ -class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{ +class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser { val astBuilder = new SparkSqlAstBuilder(conf) + + private val substitutor = new VariableSubstitution(conf) + + protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { + super.parse(substitutor.substitute(command))(toResult) + } + + protected override def nativeCommand(sqlText: String): LogicalPlan = { + HiveNativeCommand(substitutor.substitute(sqlText)) + } } /** @@ -45,6 +57,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { import org.apache.spark.sql.catalyst.parser.ParserUtils._ /** + * Pass a command to Hive using a [[HiveNativeCommand]]. + */ + override def visitExecuteNativeCommand( + ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) { + HiveNativeCommand(command(ctx)) + } + + /** * Create a [[SetCommand]] logical plan. * * Note that we assume that everything after the SET keyword is assumed to be a part of the @@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { Option(col.STRING).map(string)) } } + + /** + * Create a [[ScriptInputOutputSchema]]. + */ + override protected def withScriptIOSchema( + ctx: QuerySpecificationContext, + inRowFormat: RowFormatContext, + recordWriter: Token, + outRowFormat: RowFormatContext, + recordReader: Token, + schemaLess: Boolean): ScriptInputOutputSchema = { + if (recordWriter != null || recordReader != null) { + throw new ParseException( + "Unsupported operation: Used defined record reader/writer classes.", ctx) + } + + // Decode and input/output format. + type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + def format(fmt: RowFormatContext, configKey: String): Format = fmt match { + case c: RowFormatDelimitedContext => + // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).map(t => key -> t.getText).toSeq + } + val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ + entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ + entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) + + (entries, None, Seq.empty, None) + + case c: RowFormatSerdeContext => + // Use a serde format. + val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) + + // SPARK-10310: Special cases LazySimpleSerDe + val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Try(conf.getConfString(configKey)).toOption + } else { + None + } + (Seq.empty, Option(name), props.toSeq, recordHandler) + + case null => + // Use default (serde) format. + val name = conf.getConfString("hive.script.serde", + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + val props = Seq("field.delim" -> "\t") + val recordHandler = Try(conf.getConfString(configKey)).toOption + (Nil, Option(name), props, recordHandler) + } + + val (inFormat, inSerdeClass, inSerdeProps, reader) = + format(inRowFormat, "hive.script.recordreader") + + val (outFormat, outSerdeClass, outSerdeProps, writer) = + format(outRowFormat, "hive.script.recordwriter") + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala new file mode 100644 index 0000000000..39e441f1c3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.types.StringType + +/** + * A command that we delegate to Hive. Eventually we should remove this. + */ +case class HiveNativeCommand(sql: String) extends RunnableCommand { + + override def output: Seq[AttributeReference] = + Seq(AttributeReference("result", StringType, nullable = false)()) + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.runNativeSql(sql).map(Row(_)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 8563dc3d5a..e1be4b882f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -167,7 +167,7 @@ private[sql] class SessionState(ctx: SQLContext) { } def runNativeSql(sql: String): Seq[String] = { - throw new UnsupportedOperationException + throw new AnalysisException("Unsupported query: " + sql) } } |