aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 15:59:37 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 15:59:37 -0700
commit1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd (patch)
tree035e8e5dadc2488955b2224cc37a539bd792ab02 /sql/core/src/main/scala/org/apache
parentef6be7bedd9918c3398d510bb1eb1584908581c8 (diff)
downloadspark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.gz
spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.bz2
spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.zip
[SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder
## What changes were proposed in this pull request? This patch moves native command and script transformation into SparkSqlAstBuilder. This builds on #12561. See the last commit for diff. ## How was this patch tested? Updated test cases to reflect this. Author: Reynold Xin <rxin@databricks.com> Closes #12564 from rxin/SPARK-14798.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala95
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
3 files changed, 128 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ac12a72fc6..05fb1ef631 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import scala.collection.JavaConverters._
+import scala.util.Try
import org.antlr.v4.runtime.{ParserRuleContext, Token}
@@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
+
/**
* Concrete parser for Spark SQL statements.
*/
-class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
+class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
val astBuilder = new SparkSqlAstBuilder(conf)
+
+ private val substitutor = new VariableSubstitution(conf)
+
+ protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
+ super.parse(substitutor.substitute(command))(toResult)
+ }
+
+ protected override def nativeCommand(sqlText: String): LogicalPlan = {
+ HiveNativeCommand(substitutor.substitute(sqlText))
+ }
}
/**
@@ -45,6 +57,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
import org.apache.spark.sql.catalyst.parser.ParserUtils._
/**
+ * Pass a command to Hive using a [[HiveNativeCommand]].
+ */
+ override def visitExecuteNativeCommand(
+ ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
+ HiveNativeCommand(command(ctx))
+ }
+
+ /**
* Create a [[SetCommand]] logical plan.
*
* Note that we assume that everything after the SET keyword is assumed to be a part of the
@@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
Option(col.STRING).map(string))
}
}
+
+ /**
+ * Create a [[ScriptInputOutputSchema]].
+ */
+ override protected def withScriptIOSchema(
+ ctx: QuerySpecificationContext,
+ inRowFormat: RowFormatContext,
+ recordWriter: Token,
+ outRowFormat: RowFormatContext,
+ recordReader: Token,
+ schemaLess: Boolean): ScriptInputOutputSchema = {
+ if (recordWriter != null || recordReader != null) {
+ throw new ParseException(
+ "Unsupported operation: Used defined record reader/writer classes.", ctx)
+ }
+
+ // Decode and input/output format.
+ type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
+ def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
+ case c: RowFormatDelimitedContext =>
+ // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
+ // expects a seq of pairs in which the old parsers' token names are used as keys.
+ // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
+ // retrieving the key value pairs ourselves.
+ def entry(key: String, value: Token): Seq[(String, String)] = {
+ Option(value).map(t => key -> t.getText).toSeq
+ }
+ val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
+ entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
+
+ (entries, None, Seq.empty, None)
+
+ case c: RowFormatSerdeContext =>
+ // Use a serde format.
+ val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
+
+ // SPARK-10310: Special cases LazySimpleSerDe
+ val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
+ Try(conf.getConfString(configKey)).toOption
+ } else {
+ None
+ }
+ (Seq.empty, Option(name), props.toSeq, recordHandler)
+
+ case null =>
+ // Use default (serde) format.
+ val name = conf.getConfString("hive.script.serde",
+ "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+ val props = Seq("field.delim" -> "\t")
+ val recordHandler = Try(conf.getConfString(configKey)).toOption
+ (Nil, Option(name), props, recordHandler)
+ }
+
+ val (inFormat, inSerdeClass, inSerdeProps, reader) =
+ format(inRowFormat, "hive.script.recordreader")
+
+ val (outFormat, outSerdeClass, outSerdeProps, writer) =
+ format(outRowFormat, "hive.script.recordwriter")
+
+ ScriptInputOutputSchema(
+ inFormat, outFormat,
+ inSerdeClass, outSerdeClass,
+ inSerdeProps, outSerdeProps,
+ reader, writer,
+ schemaLess)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
new file mode 100644
index 0000000000..39e441f1c3
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.types.StringType
+
+/**
+ * A command that we delegate to Hive. Eventually we should remove this.
+ */
+case class HiveNativeCommand(sql: String) extends RunnableCommand {
+
+ override def output: Seq[AttributeReference] =
+ Seq(AttributeReference("result", StringType, nullable = false)())
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.sessionState.runNativeSql(sql).map(Row(_))
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8563dc3d5a..e1be4b882f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -167,7 +167,7 @@ private[sql] class SessionState(ctx: SQLContext) {
}
def runNativeSql(sql: String): Seq[String] = {
- throw new UnsupportedOperationException
+ throw new AnalysisException("Unsupported query: " + sql)
}
}