aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main/scala/org
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 15:59:37 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 15:59:37 -0700
commit1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd (patch)
tree035e8e5dadc2488955b2224cc37a539bd792ab02 /sql/hive/src/main/scala/org
parentef6be7bedd9918c3398d510bb1eb1584908581c8 (diff)
downloadspark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.gz
spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.bz2
spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.zip
[SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder
## What changes were proposed in this pull request? This patch moves native command and script transformation into SparkSqlAstBuilder. This builds on #12561. See the last commit for diff. ## How was this patch tested? Updated test cases to reflect this. Author: Reynold Xin <rxin@databricks.com> Closes #12564 from rxin/SPARK-14798.
Diffstat (limited to 'sql/hive/src/main/scala/org')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala36
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala84
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala67
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala3
8 files changed, 30 insertions, 182 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8732285dac..ca397910c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,10 +32,9 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
index 1c1bfb610c..0ee34f07fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
-import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand}
+import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index bbdcc8c6c2..8720e54ed6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -38,8 +38,9 @@ private[hive] trait HiveStrategies {
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
- ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil
+ case logical.ScriptTransformation(input, script, output, child, ioschema) =>
+ val hiveIoSchema = HiveScriptIOSchema(ioschema)
+ ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 2d44813f0e..86115d0e9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.execution.HiveScriptIOSchema
import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
/**
@@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
}
private def scriptTransformationToSQL(plan: ScriptTransformation): String = {
- val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema]
- val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse(
+ val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse(
throw new UnsupportedOperationException(
- s"unsupported row format ${ioSchema.inputRowFormat}"))
- val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse(
+ s"unsupported row format ${plan.ioschema.inputRowFormat}"))
+ val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse(
throw new UnsupportedOperationException(
- s"unsupported row format ${ioSchema.outputRowFormat}"))
+ s"unsupported row format ${plan.ioschema.outputRowFormat}"))
val outputSchema = plan.output.map { attr =>
s"${attr.sql} ${attr.dataType.simpleString}"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
deleted file mode 100644
index 8c1f4a8dc5..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.HiveSessionState
-import org.apache.spark.sql.types.StringType
-
-private[hive]
-case class HiveNativeCommand(sql: String) extends RunnableCommand {
-
- override def output: Seq[AttributeReference] =
- Seq(AttributeReference("result", StringType, nullable = false)())
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_))
- }
-
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 989da92bc7..35530b9814 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -17,16 +17,11 @@
package org.apache.spark.sql.hive.execution
-import scala.util.Try
-
-import org.antlr.v4.runtime.Token
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.HiveNativeCommand
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
/**
@@ -55,14 +50,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
import ParserUtils._
/**
- * Pass a command to Hive using a [[HiveNativeCommand]].
- */
- override def visitExecuteNativeCommand(
- ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
- HiveNativeCommand(command(ctx))
- }
-
- /**
* Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
* options are passed on to Hive) e.g.:
* {{{
@@ -80,73 +67,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
}
}
-
- /**
- * Create a [[HiveScriptIOSchema]].
- */
- override protected def withScriptIOSchema(
- ctx: QuerySpecificationContext,
- inRowFormat: RowFormatContext,
- recordWriter: Token,
- outRowFormat: RowFormatContext,
- recordReader: Token,
- schemaLess: Boolean): HiveScriptIOSchema = {
- if (recordWriter != null || recordReader != null) {
- throw new ParseException(
- "Unsupported operation: Used defined record reader/writer classes.", ctx)
- }
-
- // Decode and input/output format.
- type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
- def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
- case c: RowFormatDelimitedContext =>
- // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
- // expects a seq of pairs in which the old parsers' token names are used as keys.
- // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
- // retrieving the key value pairs ourselves.
- def entry(key: String, value: Token): Seq[(String, String)] = {
- Option(value).map(t => key -> t.getText).toSeq
- }
- val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
- entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
- entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
- entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
- entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
-
- (entries, None, Seq.empty, None)
-
- case c: RowFormatSerdeContext =>
- // Use a serde format.
- val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
-
- // SPARK-10310: Special cases LazySimpleSerDe
- val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
- Try(conf.getConfString(configKey)).toOption
- } else {
- None
- }
- (Seq.empty, Option(name), props.toSeq, recordHandler)
-
- case null =>
- // Use default (serde) format.
- val name = conf.getConfString("hive.script.serde",
- "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
- val props = Seq(serdeConstants.FIELD_DELIM -> "\t")
- val recordHandler = Try(conf.getConfString(configKey)).toOption
- (Nil, Option(name), props, recordHandler)
- }
-
- val (inFormat, inSerdeClass, inSerdeProps, reader) =
- format(inRowFormat, "hive.script.recordreader")
-
- val (outFormat, outSerdeClass, outSerdeProps, writer) =
- format(outRowFormat, "hive.script.recordwriter")
-
- HiveScriptIOSchema(
- inFormat, outFormat,
- inSerdeClass, outSerdeClass,
- inSerdeProps, outSerdeProps,
- reader, writer,
- schemaLess)
- }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 2f7cec354d..8c8becfb87 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -312,6 +312,22 @@ private class ScriptTransformationWriterThread(
}
}
+private[hive]
+object HiveScriptIOSchema {
+ def apply(input: ScriptInputOutputSchema): HiveScriptIOSchema = {
+ HiveScriptIOSchema(
+ input.inputRowFormat,
+ input.outputRowFormat,
+ input.inputSerdeClass,
+ input.outputSerdeClass,
+ input.inputSerdeProps,
+ input.outputSerdeProps,
+ input.recordReaderClass,
+ input.recordWriterClass,
+ input.schemaLess)
+ }
+}
+
/**
* The wrapper class of Hive input and output schema properties
*/
@@ -325,7 +341,8 @@ case class HiveScriptIOSchema (
outputSerdeProps: Seq[(String, String)],
recordReaderClass: Option[String],
recordWriterClass: Option[String],
- schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors {
+ schemaLess: Boolean)
+ extends HiveInspectors {
private val defaultFormat = Map(
("TOK_TABLEROWFORMATFIELD", "\t"),
@@ -402,52 +419,4 @@ case class HiveScriptIOSchema (
instance
}
}
-
- def inputRowFormatSQL: Option[String] =
- getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
-
- def outputRowFormatSQL: Option[String] =
- getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
-
- /**
- * Get the row format specification
- * Note:
- * 1. Changes are needed when readerClause and writerClause are supported.
- * 2. Changes are needed when "ESCAPED BY" is supported.
- */
- private def getRowFormatSQL(
- rowFormat: Seq[(String, String)],
- serdeClass: Option[String],
- serdeProps: Seq[(String, String)]): Option[String] = {
- if (schemaLess) return Some("")
-
- val rowFormatDelimited =
- rowFormat.map {
- case ("TOK_TABLEROWFORMATFIELD", value) =>
- "FIELDS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
- "COLLECTION ITEMS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
- "MAP KEYS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATLINES", value) =>
- "LINES TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATNULL", value) =>
- "NULL DEFINED AS " + value
- case o => return None
- }
-
- val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
- val serdePropsSQL =
- if (serdeClass.nonEmpty) {
- val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
- if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
- } else {
- ""
- }
- if (rowFormat.nonEmpty) {
- Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
- } else {
- Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
- }
- }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 2bb13996c1..741e3bdd18 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.CacheTableCommand
+import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{ShutdownHookManager, Utils}