aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 15:59:37 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 15:59:37 -0700
commit1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd (patch)
tree035e8e5dadc2488955b2224cc37a539bd792ab02
parentef6be7bedd9918c3398d510bb1eb1584908581c8 (diff)
downloadspark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.gz
spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.tar.bz2
spark-1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd.zip
[SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder
## What changes were proposed in this pull request? This patch moves native command and script transformation into SparkSqlAstBuilder. This builds on #12561. See the last commit for diff. ## How was this patch tested? Updated test cases to reflect this. Author: Reynold Xin <rxin@databricks.com> Closes #12564 from rxin/SPARK-14798.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala95
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala (renamed from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala)11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala84
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala67
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala2
15 files changed, 192 insertions, 182 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
index 578027da77..e176e9b82b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
@@ -37,7 +37,65 @@ case class ScriptTransformation(
}
/**
- * A placeholder for implementation specific input and output properties when passing data
- * to a script. For example, in Hive this would specify which SerDes to use.
+ * Input and output properties when passing data to a script.
+ * For example, in Hive this would specify which SerDes to use.
*/
-trait ScriptInputOutputSchema
+case class ScriptInputOutputSchema(
+ inputRowFormat: Seq[(String, String)],
+ outputRowFormat: Seq[(String, String)],
+ inputSerdeClass: Option[String],
+ outputSerdeClass: Option[String],
+ inputSerdeProps: Seq[(String, String)],
+ outputSerdeProps: Seq[(String, String)],
+ recordReaderClass: Option[String],
+ recordWriterClass: Option[String],
+ schemaLess: Boolean) {
+
+ def inputRowFormatSQL: Option[String] =
+ getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
+
+ def outputRowFormatSQL: Option[String] =
+ getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
+
+ /**
+ * Get the row format specification
+ * Note:
+ * 1. Changes are needed when readerClause and writerClause are supported.
+ * 2. Changes are needed when "ESCAPED BY" is supported.
+ */
+ private def getRowFormatSQL(
+ rowFormat: Seq[(String, String)],
+ serdeClass: Option[String],
+ serdeProps: Seq[(String, String)]): Option[String] = {
+ if (schemaLess) return Some("")
+
+ val rowFormatDelimited =
+ rowFormat.map {
+ case ("TOK_TABLEROWFORMATFIELD", value) =>
+ "FIELDS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
+ "COLLECTION ITEMS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
+ "MAP KEYS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATLINES", value) =>
+ "LINES TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATNULL", value) =>
+ "NULL DEFINED AS " + value
+ case o => return None
+ }
+
+ val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
+ val serdePropsSQL =
+ if (serdeClass.nonEmpty) {
+ val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
+ if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
+ } else {
+ ""
+ }
+ if (rowFormat.nonEmpty) {
+ Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
+ } else {
+ Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ac12a72fc6..05fb1ef631 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import scala.collection.JavaConverters._
+import scala.util.Try
import org.antlr.v4.runtime.{ParserRuleContext, Token}
@@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
+
/**
* Concrete parser for Spark SQL statements.
*/
-class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
+class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
val astBuilder = new SparkSqlAstBuilder(conf)
+
+ private val substitutor = new VariableSubstitution(conf)
+
+ protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
+ super.parse(substitutor.substitute(command))(toResult)
+ }
+
+ protected override def nativeCommand(sqlText: String): LogicalPlan = {
+ HiveNativeCommand(substitutor.substitute(sqlText))
+ }
}
/**
@@ -45,6 +57,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
import org.apache.spark.sql.catalyst.parser.ParserUtils._
/**
+ * Pass a command to Hive using a [[HiveNativeCommand]].
+ */
+ override def visitExecuteNativeCommand(
+ ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
+ HiveNativeCommand(command(ctx))
+ }
+
+ /**
* Create a [[SetCommand]] logical plan.
*
* Note that we assume that everything after the SET keyword is assumed to be a part of the
@@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
Option(col.STRING).map(string))
}
}
+
+ /**
+ * Create a [[ScriptInputOutputSchema]].
+ */
+ override protected def withScriptIOSchema(
+ ctx: QuerySpecificationContext,
+ inRowFormat: RowFormatContext,
+ recordWriter: Token,
+ outRowFormat: RowFormatContext,
+ recordReader: Token,
+ schemaLess: Boolean): ScriptInputOutputSchema = {
+ if (recordWriter != null || recordReader != null) {
+ throw new ParseException(
+ "Unsupported operation: Used defined record reader/writer classes.", ctx)
+ }
+
+ // Decode and input/output format.
+ type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
+ def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
+ case c: RowFormatDelimitedContext =>
+ // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
+ // expects a seq of pairs in which the old parsers' token names are used as keys.
+ // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
+ // retrieving the key value pairs ourselves.
+ def entry(key: String, value: Token): Seq[(String, String)] = {
+ Option(value).map(t => key -> t.getText).toSeq
+ }
+ val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
+ entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
+
+ (entries, None, Seq.empty, None)
+
+ case c: RowFormatSerdeContext =>
+ // Use a serde format.
+ val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
+
+ // SPARK-10310: Special cases LazySimpleSerDe
+ val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
+ Try(conf.getConfString(configKey)).toOption
+ } else {
+ None
+ }
+ (Seq.empty, Option(name), props.toSeq, recordHandler)
+
+ case null =>
+ // Use default (serde) format.
+ val name = conf.getConfString("hive.script.serde",
+ "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+ val props = Seq("field.delim" -> "\t")
+ val recordHandler = Try(conf.getConfString(configKey)).toOption
+ (Nil, Option(name), props, recordHandler)
+ }
+
+ val (inFormat, inSerdeClass, inSerdeProps, reader) =
+ format(inRowFormat, "hive.script.recordreader")
+
+ val (outFormat, outSerdeClass, outSerdeProps, writer) =
+ format(outRowFormat, "hive.script.recordwriter")
+
+ ScriptInputOutputSchema(
+ inFormat, outFormat,
+ inSerdeClass, outSerdeClass,
+ inSerdeProps, outSerdeProps,
+ reader, writer,
+ schemaLess)
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
index 8c1f4a8dc5..39e441f1c3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package org.apache.spark.sql.hive.execution
+package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.HiveSessionState
import org.apache.spark.sql.types.StringType
-private[hive]
+/**
+ * A command that we delegate to Hive. Eventually we should remove this.
+ */
case class HiveNativeCommand(sql: String) extends RunnableCommand {
override def output: Seq[AttributeReference] =
Seq(AttributeReference("result", StringType, nullable = false)())
override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_))
+ sqlContext.sessionState.runNativeSql(sql).map(Row(_))
}
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8563dc3d5a..e1be4b882f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -167,7 +167,7 @@ private[sql] class SessionState(ctx: SQLContext) {
}
def runNativeSql(sql: String): Seq[String] = {
- throw new UnsupportedOperationException
+ throw new AnalysisException("Unsupported query: " + sql)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index e99eb02252..a1ffda9656 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -611,26 +611,6 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
}
- test("unsupported operations") {
- intercept[ParseException] {
- parser.parsePlan("DROP TABLE tab PURGE")
- }
- intercept[ParseException] {
- parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')")
- }
- intercept[ParseException] {
- parser.parsePlan(
- """
- |CREATE EXTERNAL TABLE oneToTenDef
- |USING org.apache.spark.sql.sources
- |OPTIONS (from '1', to '10')
- """.stripMargin)
- }
- intercept[ParseException] {
- parser.parsePlan("SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) FROM testData")
- }
- }
-
test("SPARK-14383: DISTRIBUTE and UNSET as non-keywords") {
val sql = "SELECT distribute, unset FROM x"
val parsed = parser.parsePlan(sql)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8732285dac..ca397910c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,10 +32,9 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
index 1c1bfb610c..0ee34f07fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
-import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand}
+import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index bbdcc8c6c2..8720e54ed6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -38,8 +38,9 @@ private[hive] trait HiveStrategies {
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
- ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil
+ case logical.ScriptTransformation(input, script, output, child, ioschema) =>
+ val hiveIoSchema = HiveScriptIOSchema(ioschema)
+ ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 2d44813f0e..86115d0e9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.execution.HiveScriptIOSchema
import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
/**
@@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
}
private def scriptTransformationToSQL(plan: ScriptTransformation): String = {
- val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema]
- val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse(
+ val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse(
throw new UnsupportedOperationException(
- s"unsupported row format ${ioSchema.inputRowFormat}"))
- val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse(
+ s"unsupported row format ${plan.ioschema.inputRowFormat}"))
+ val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse(
throw new UnsupportedOperationException(
- s"unsupported row format ${ioSchema.outputRowFormat}"))
+ s"unsupported row format ${plan.ioschema.outputRowFormat}"))
val outputSchema = plan.output.map { attr =>
s"${attr.sql} ${attr.dataType.simpleString}"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 989da92bc7..35530b9814 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -17,16 +17,11 @@
package org.apache.spark.sql.hive.execution
-import scala.util.Try
-
-import org.antlr.v4.runtime.Token
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.HiveNativeCommand
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
/**
@@ -55,14 +50,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
import ParserUtils._
/**
- * Pass a command to Hive using a [[HiveNativeCommand]].
- */
- override def visitExecuteNativeCommand(
- ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
- HiveNativeCommand(command(ctx))
- }
-
- /**
* Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
* options are passed on to Hive) e.g.:
* {{{
@@ -80,73 +67,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
}
}
-
- /**
- * Create a [[HiveScriptIOSchema]].
- */
- override protected def withScriptIOSchema(
- ctx: QuerySpecificationContext,
- inRowFormat: RowFormatContext,
- recordWriter: Token,
- outRowFormat: RowFormatContext,
- recordReader: Token,
- schemaLess: Boolean): HiveScriptIOSchema = {
- if (recordWriter != null || recordReader != null) {
- throw new ParseException(
- "Unsupported operation: Used defined record reader/writer classes.", ctx)
- }
-
- // Decode and input/output format.
- type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
- def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
- case c: RowFormatDelimitedContext =>
- // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
- // expects a seq of pairs in which the old parsers' token names are used as keys.
- // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
- // retrieving the key value pairs ourselves.
- def entry(key: String, value: Token): Seq[(String, String)] = {
- Option(value).map(t => key -> t.getText).toSeq
- }
- val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
- entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
- entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
- entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
- entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
-
- (entries, None, Seq.empty, None)
-
- case c: RowFormatSerdeContext =>
- // Use a serde format.
- val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
-
- // SPARK-10310: Special cases LazySimpleSerDe
- val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
- Try(conf.getConfString(configKey)).toOption
- } else {
- None
- }
- (Seq.empty, Option(name), props.toSeq, recordHandler)
-
- case null =>
- // Use default (serde) format.
- val name = conf.getConfString("hive.script.serde",
- "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
- val props = Seq(serdeConstants.FIELD_DELIM -> "\t")
- val recordHandler = Try(conf.getConfString(configKey)).toOption
- (Nil, Option(name), props, recordHandler)
- }
-
- val (inFormat, inSerdeClass, inSerdeProps, reader) =
- format(inRowFormat, "hive.script.recordreader")
-
- val (outFormat, outSerdeClass, outSerdeProps, writer) =
- format(outRowFormat, "hive.script.recordwriter")
-
- HiveScriptIOSchema(
- inFormat, outFormat,
- inSerdeClass, outSerdeClass,
- inSerdeProps, outSerdeProps,
- reader, writer,
- schemaLess)
- }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 2f7cec354d..8c8becfb87 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -312,6 +312,22 @@ private class ScriptTransformationWriterThread(
}
}
+private[hive]
+object HiveScriptIOSchema {
+ def apply(input: ScriptInputOutputSchema): HiveScriptIOSchema = {
+ HiveScriptIOSchema(
+ input.inputRowFormat,
+ input.outputRowFormat,
+ input.inputSerdeClass,
+ input.outputSerdeClass,
+ input.inputSerdeProps,
+ input.outputSerdeProps,
+ input.recordReaderClass,
+ input.recordWriterClass,
+ input.schemaLess)
+ }
+}
+
/**
* The wrapper class of Hive input and output schema properties
*/
@@ -325,7 +341,8 @@ case class HiveScriptIOSchema (
outputSerdeProps: Seq[(String, String)],
recordReaderClass: Option[String],
recordWriterClass: Option[String],
- schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors {
+ schemaLess: Boolean)
+ extends HiveInspectors {
private val defaultFormat = Map(
("TOK_TABLEROWFORMATFIELD", "\t"),
@@ -402,52 +419,4 @@ case class HiveScriptIOSchema (
instance
}
}
-
- def inputRowFormatSQL: Option[String] =
- getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
-
- def outputRowFormatSQL: Option[String] =
- getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
-
- /**
- * Get the row format specification
- * Note:
- * 1. Changes are needed when readerClause and writerClause are supported.
- * 2. Changes are needed when "ESCAPED BY" is supported.
- */
- private def getRowFormatSQL(
- rowFormat: Seq[(String, String)],
- serdeClass: Option[String],
- serdeProps: Seq[(String, String)]): Option[String] = {
- if (schemaLess) return Some("")
-
- val rowFormatDelimited =
- rowFormat.map {
- case ("TOK_TABLEROWFORMATFIELD", value) =>
- "FIELDS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
- "COLLECTION ITEMS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
- "MAP KEYS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATLINES", value) =>
- "LINES TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATNULL", value) =>
- "NULL DEFINED AS " + value
- case o => return None
- }
-
- val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
- val serdePropsSQL =
- if (serdeClass.nonEmpty) {
- val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
- if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
- } else {
- ""
- }
- if (rowFormat.nonEmpty) {
- Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
- } else {
- Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
- }
- }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 2bb13996c1..741e3bdd18 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.CacheTableCommand
+import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{ShutdownHookManager, Utils}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 4c90dbeb1b..e3522567b9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
import org.apache.spark.sql.hive.test.TestHive
class HiveDDLCommandSuite extends PlanTest {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e91870492a..7a6f1ce0d1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.HiveNativeCommand
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.test.TestHiveSingleton
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 994dc4a2d2..77906ef2b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -28,7 +28,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand}
+import org.apache.spark.sql.execution.command.{ExplainCommand, HiveNativeCommand, SetCommand}
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable}
import org.apache.spark.sql.hive.SQLBuilder