diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-21 15:59:37 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-21 15:59:37 -0700 |
commit | 1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd (patch) | |
tree | 035e8e5dadc2488955b2224cc37a539bd792ab02 /sql/hive/src/main/scala/org | |
parent | ef6be7bedd9918c3398d510bb1eb1584908581c8 (diff) | |
download | spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.gz spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.bz2 spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.zip |
[SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder
## What changes were proposed in this pull request?
This patch moves native command and script transformation into SparkSqlAstBuilder. This builds on #12561. See the last commit for diff.
## How was this patch tested?
Updated test cases to reflect this.
Author: Reynold Xin <rxin@databricks.com>
Closes #12564 from rxin/SPARK-14798.
Diffstat (limited to 'sql/hive/src/main/scala/org')
8 files changed, 30 insertions, 182 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8732285dac..ca397910c6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -32,10 +32,9 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand} +import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand} import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} -import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala index 1c1bfb610c..0ee34f07fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} -import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} +import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand} +import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index bbdcc8c6c2..8720e54ed6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -38,8 +38,9 @@ private[hive] trait HiveStrategies { object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) => - ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil + case logical.ScriptTransformation(input, script, output, child, ioschema) => + val hiveIoSchema = HiveScriptIOSchema(ioschema) + ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 2d44813f0e..86115d0e9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.execution.HiveScriptIOSchema import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} /** @@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi } private def scriptTransformationToSQL(plan: ScriptTransformation): String = { - val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema] - val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse( + val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse( throw new UnsupportedOperationException( - s"unsupported row format ${ioSchema.inputRowFormat}")) - val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse( + s"unsupported row format ${plan.ioschema.inputRowFormat}")) + val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse( throw new UnsupportedOperationException( - s"unsupported row format ${ioSchema.outputRowFormat}")) + s"unsupported row format ${plan.ioschema.outputRowFormat}")) val outputSchema = plan.output.map { attr => s"${attr.sql} ${attr.dataType.simpleString}" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala deleted file mode 100644 index 8c1f4a8dc5..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hive.HiveSessionState -import org.apache.spark.sql.types.StringType - -private[hive] -case class HiveNativeCommand(sql: String) extends RunnableCommand { - - override def output: Seq[AttributeReference] = - Seq(AttributeReference("result", StringType, nullable = false)()) - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_)) - } - -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 989da92bc7..35530b9814 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -17,16 +17,11 @@ package org.apache.spark.sql.hive.execution -import scala.util.Try - -import org.antlr.v4.runtime.Token -import org.apache.hadoop.hive.serde.serdeConstants - -import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.execution.command.HiveNativeCommand import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} /** @@ -55,14 +50,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { import ParserUtils._ /** - * Pass a command to Hive using a [[HiveNativeCommand]]. - */ - override def visitExecuteNativeCommand( - ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) { - HiveNativeCommand(command(ctx)) - } - - /** * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other * options are passed on to Hive) e.g.: * {{{ @@ -80,73 +67,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) } } - - /** - * Create a [[HiveScriptIOSchema]]. - */ - override protected def withScriptIOSchema( - ctx: QuerySpecificationContext, - inRowFormat: RowFormatContext, - recordWriter: Token, - outRowFormat: RowFormatContext, - recordReader: Token, - schemaLess: Boolean): HiveScriptIOSchema = { - if (recordWriter != null || recordReader != null) { - throw new ParseException( - "Unsupported operation: Used defined record reader/writer classes.", ctx) - } - - // Decode and input/output format. - type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) - def format(fmt: RowFormatContext, configKey: String): Format = fmt match { - case c: RowFormatDelimitedContext => - // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema - // expects a seq of pairs in which the old parsers' token names are used as keys. - // Transforming the result of visitRowFormatDelimited would be quite a bit messier than - // retrieving the key value pairs ourselves. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).map(t => key -> t.getText).toSeq - } - val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - - (entries, None, Seq.empty, None) - - case c: RowFormatSerdeContext => - // Use a serde format. - val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) - - // SPARK-10310: Special cases LazySimpleSerDe - val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - Try(conf.getConfString(configKey)).toOption - } else { - None - } - (Seq.empty, Option(name), props.toSeq, recordHandler) - - case null => - // Use default (serde) format. - val name = conf.getConfString("hive.script.serde", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") - val props = Seq(serdeConstants.FIELD_DELIM -> "\t") - val recordHandler = Try(conf.getConfString(configKey)).toOption - (Nil, Option(name), props, recordHandler) - } - - val (inFormat, inSerdeClass, inSerdeProps, reader) = - format(inRowFormat, "hive.script.recordreader") - - val (outFormat, outSerdeClass, outSerdeProps, writer) = - format(outRowFormat, "hive.script.recordwriter") - - HiveScriptIOSchema( - inFormat, outFormat, - inSerdeClass, outSerdeClass, - inSerdeProps, outSerdeProps, - reader, writer, - schemaLess) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 2f7cec354d..8c8becfb87 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -312,6 +312,22 @@ private class ScriptTransformationWriterThread( } } +private[hive] +object HiveScriptIOSchema { + def apply(input: ScriptInputOutputSchema): HiveScriptIOSchema = { + HiveScriptIOSchema( + input.inputRowFormat, + input.outputRowFormat, + input.inputSerdeClass, + input.outputSerdeClass, + input.inputSerdeProps, + input.outputSerdeProps, + input.recordReaderClass, + input.recordWriterClass, + input.schemaLess) + } +} + /** * The wrapper class of Hive input and output schema properties */ @@ -325,7 +341,8 @@ case class HiveScriptIOSchema ( outputSerdeProps: Seq[(String, String)], recordReaderClass: Option[String], recordWriterClass: Option[String], - schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors { + schemaLess: Boolean) + extends HiveInspectors { private val defaultFormat = Map( ("TOK_TABLEROWFORMATFIELD", "\t"), @@ -402,52 +419,4 @@ case class HiveScriptIOSchema ( instance } } - - def inputRowFormatSQL: Option[String] = - getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps) - - def outputRowFormatSQL: Option[String] = - getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps) - - /** - * Get the row format specification - * Note: - * 1. Changes are needed when readerClause and writerClause are supported. - * 2. Changes are needed when "ESCAPED BY" is supported. - */ - private def getRowFormatSQL( - rowFormat: Seq[(String, String)], - serdeClass: Option[String], - serdeProps: Seq[(String, String)]): Option[String] = { - if (schemaLess) return Some("") - - val rowFormatDelimited = - rowFormat.map { - case ("TOK_TABLEROWFORMATFIELD", value) => - "FIELDS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATCOLLITEMS", value) => - "COLLECTION ITEMS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATMAPKEYS", value) => - "MAP KEYS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATLINES", value) => - "LINES TERMINATED BY " + value - case ("TOK_TABLEROWFORMATNULL", value) => - "NULL DEFINED AS " + value - case o => return None - } - - val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") - val serdePropsSQL = - if (serdeClass.nonEmpty) { - val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") - if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" - } else { - "" - } - if (rowFormat.nonEmpty) { - Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ")) - } else { - Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL) - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 2bb13996c1..741e3bdd18 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.CacheTableCommand +import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} |