diff options
15 files changed, 192 insertions, 182 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala index 578027da77..e176e9b82b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala @@ -37,7 +37,65 @@ case class ScriptTransformation( } /** - * A placeholder for implementation specific input and output properties when passing data - * to a script. For example, in Hive this would specify which SerDes to use. + * Input and output properties when passing data to a script. + * For example, in Hive this would specify which SerDes to use. */ -trait ScriptInputOutputSchema +case class ScriptInputOutputSchema( + inputRowFormat: Seq[(String, String)], + outputRowFormat: Seq[(String, String)], + inputSerdeClass: Option[String], + outputSerdeClass: Option[String], + inputSerdeProps: Seq[(String, String)], + outputSerdeProps: Seq[(String, String)], + recordReaderClass: Option[String], + recordWriterClass: Option[String], + schemaLess: Boolean) { + + def inputRowFormatSQL: Option[String] = + getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps) + + def outputRowFormatSQL: Option[String] = + getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps) + + /** + * Get the row format specification + * Note: + * 1. Changes are needed when readerClause and writerClause are supported. + * 2. Changes are needed when "ESCAPED BY" is supported. + */ + private def getRowFormatSQL( + rowFormat: Seq[(String, String)], + serdeClass: Option[String], + serdeProps: Seq[(String, String)]): Option[String] = { + if (schemaLess) return Some("") + + val rowFormatDelimited = + rowFormat.map { + case ("TOK_TABLEROWFORMATFIELD", value) => + "FIELDS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATCOLLITEMS", value) => + "COLLECTION ITEMS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATMAPKEYS", value) => + "MAP KEYS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATLINES", value) => + "LINES TERMINATED BY " + value + case ("TOK_TABLEROWFORMATNULL", value) => + "NULL DEFINED AS " + value + case o => return None + } + + val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") + val serdePropsSQL = + if (serdeClass.nonEmpty) { + val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") + if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" + } else { + "" + } + if (rowFormat.nonEmpty) { + Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ")) + } else { + Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ac12a72fc6..05fb1ef631 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import scala.collection.JavaConverters._ +import scala.util.Try import org.antlr.v4.runtime.{ParserRuleContext, Token} @@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} + /** * Concrete parser for Spark SQL statements. */ -class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{ +class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser { val astBuilder = new SparkSqlAstBuilder(conf) + + private val substitutor = new VariableSubstitution(conf) + + protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { + super.parse(substitutor.substitute(command))(toResult) + } + + protected override def nativeCommand(sqlText: String): LogicalPlan = { + HiveNativeCommand(substitutor.substitute(sqlText)) + } } /** @@ -45,6 +57,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { import org.apache.spark.sql.catalyst.parser.ParserUtils._ /** + * Pass a command to Hive using a [[HiveNativeCommand]]. + */ + override def visitExecuteNativeCommand( + ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) { + HiveNativeCommand(command(ctx)) + } + + /** * Create a [[SetCommand]] logical plan. * * Note that we assume that everything after the SET keyword is assumed to be a part of the @@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { Option(col.STRING).map(string)) } } + + /** + * Create a [[ScriptInputOutputSchema]]. + */ + override protected def withScriptIOSchema( + ctx: QuerySpecificationContext, + inRowFormat: RowFormatContext, + recordWriter: Token, + outRowFormat: RowFormatContext, + recordReader: Token, + schemaLess: Boolean): ScriptInputOutputSchema = { + if (recordWriter != null || recordReader != null) { + throw new ParseException( + "Unsupported operation: Used defined record reader/writer classes.", ctx) + } + + // Decode and input/output format. + type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + def format(fmt: RowFormatContext, configKey: String): Format = fmt match { + case c: RowFormatDelimitedContext => + // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).map(t => key -> t.getText).toSeq + } + val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ + entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ + entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) + + (entries, None, Seq.empty, None) + + case c: RowFormatSerdeContext => + // Use a serde format. + val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) + + // SPARK-10310: Special cases LazySimpleSerDe + val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Try(conf.getConfString(configKey)).toOption + } else { + None + } + (Seq.empty, Option(name), props.toSeq, recordHandler) + + case null => + // Use default (serde) format. + val name = conf.getConfString("hive.script.serde", + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + val props = Seq("field.delim" -> "\t") + val recordHandler = Try(conf.getConfString(configKey)).toOption + (Nil, Option(name), props, recordHandler) + } + + val (inFormat, inSerdeClass, inSerdeProps, reader) = + format(inRowFormat, "hive.script.recordreader") + + val (outFormat, outSerdeClass, outSerdeProps, writer) = + format(outRowFormat, "hive.script.recordwriter") + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala index 8c1f4a8dc5..39e441f1c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala @@ -15,22 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.execution +package org.apache.spark.sql.execution.command import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hive.HiveSessionState import org.apache.spark.sql.types.StringType -private[hive] +/** + * A command that we delegate to Hive. Eventually we should remove this. + */ case class HiveNativeCommand(sql: String) extends RunnableCommand { override def output: Seq[AttributeReference] = Seq(AttributeReference("result", StringType, nullable = false)()) override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_)) + sqlContext.sessionState.runNativeSql(sql).map(Row(_)) } - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 8563dc3d5a..e1be4b882f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -167,7 +167,7 @@ private[sql] class SessionState(ctx: SQLContext) { } def runNativeSql(sql: String): Seq[String] = { - throw new UnsupportedOperationException + throw new AnalysisException("Unsupported query: " + sql) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index e99eb02252..a1ffda9656 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -611,26 +611,6 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed2, expected2) } - test("unsupported operations") { - intercept[ParseException] { - parser.parsePlan("DROP TABLE tab PURGE") - } - intercept[ParseException] { - parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')") - } - intercept[ParseException] { - parser.parsePlan( - """ - |CREATE EXTERNAL TABLE oneToTenDef - |USING org.apache.spark.sql.sources - |OPTIONS (from '1', to '10') - """.stripMargin) - } - intercept[ParseException] { - parser.parsePlan("SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) FROM testData") - } - } - test("SPARK-14383: DISTRIBUTE and UNSET as non-keywords") { val sql = "SELECT distribute, unset FROM x" val parsed = parser.parsePlan(sql) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8732285dac..ca397910c6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -32,10 +32,9 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand} +import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand} import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} -import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala index 1c1bfb610c..0ee34f07fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} -import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} +import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand} +import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index bbdcc8c6c2..8720e54ed6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -38,8 +38,9 @@ private[hive] trait HiveStrategies { object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) => - ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil + case logical.ScriptTransformation(input, script, output, child, ioschema) => + val hiveIoSchema = HiveScriptIOSchema(ioschema) + ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 2d44813f0e..86115d0e9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.execution.HiveScriptIOSchema import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} /** @@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi } private def scriptTransformationToSQL(plan: ScriptTransformation): String = { - val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema] - val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse( + val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse( throw new UnsupportedOperationException( - s"unsupported row format ${ioSchema.inputRowFormat}")) - val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse( + s"unsupported row format ${plan.ioschema.inputRowFormat}")) + val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse( throw new UnsupportedOperationException( - s"unsupported row format ${ioSchema.outputRowFormat}")) + s"unsupported row format ${plan.ioschema.outputRowFormat}")) val outputSchema = plan.output.map { attr => s"${attr.sql} ${attr.dataType.simpleString}" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 989da92bc7..35530b9814 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -17,16 +17,11 @@ package org.apache.spark.sql.hive.execution -import scala.util.Try - -import org.antlr.v4.runtime.Token -import org.apache.hadoop.hive.serde.serdeConstants - -import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.execution.command.HiveNativeCommand import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} /** @@ -55,14 +50,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { import ParserUtils._ /** - * Pass a command to Hive using a [[HiveNativeCommand]]. - */ - override def visitExecuteNativeCommand( - ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) { - HiveNativeCommand(command(ctx)) - } - - /** * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other * options are passed on to Hive) e.g.: * {{{ @@ -80,73 +67,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) } } - - /** - * Create a [[HiveScriptIOSchema]]. - */ - override protected def withScriptIOSchema( - ctx: QuerySpecificationContext, - inRowFormat: RowFormatContext, - recordWriter: Token, - outRowFormat: RowFormatContext, - recordReader: Token, - schemaLess: Boolean): HiveScriptIOSchema = { - if (recordWriter != null || recordReader != null) { - throw new ParseException( - "Unsupported operation: Used defined record reader/writer classes.", ctx) - } - - // Decode and input/output format. - type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) - def format(fmt: RowFormatContext, configKey: String): Format = fmt match { - case c: RowFormatDelimitedContext => - // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema - // expects a seq of pairs in which the old parsers' token names are used as keys. - // Transforming the result of visitRowFormatDelimited would be quite a bit messier than - // retrieving the key value pairs ourselves. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).map(t => key -> t.getText).toSeq - } - val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - - (entries, None, Seq.empty, None) - - case c: RowFormatSerdeContext => - // Use a serde format. - val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) - - // SPARK-10310: Special cases LazySimpleSerDe - val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - Try(conf.getConfString(configKey)).toOption - } else { - None - } - (Seq.empty, Option(name), props.toSeq, recordHandler) - - case null => - // Use default (serde) format. - val name = conf.getConfString("hive.script.serde", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") - val props = Seq(serdeConstants.FIELD_DELIM -> "\t") - val recordHandler = Try(conf.getConfString(configKey)).toOption - (Nil, Option(name), props, recordHandler) - } - - val (inFormat, inSerdeClass, inSerdeProps, reader) = - format(inRowFormat, "hive.script.recordreader") - - val (outFormat, outSerdeClass, outSerdeProps, writer) = - format(outRowFormat, "hive.script.recordwriter") - - HiveScriptIOSchema( - inFormat, outFormat, - inSerdeClass, outSerdeClass, - inSerdeProps, outSerdeProps, - reader, writer, - schemaLess) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 2f7cec354d..8c8becfb87 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -312,6 +312,22 @@ private class ScriptTransformationWriterThread( } } +private[hive] +object HiveScriptIOSchema { + def apply(input: ScriptInputOutputSchema): HiveScriptIOSchema = { + HiveScriptIOSchema( + input.inputRowFormat, + input.outputRowFormat, + input.inputSerdeClass, + input.outputSerdeClass, + input.inputSerdeProps, + input.outputSerdeProps, + input.recordReaderClass, + input.recordWriterClass, + input.schemaLess) + } +} + /** * The wrapper class of Hive input and output schema properties */ @@ -325,7 +341,8 @@ case class HiveScriptIOSchema ( outputSerdeProps: Seq[(String, String)], recordReaderClass: Option[String], recordWriterClass: Option[String], - schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors { + schemaLess: Boolean) + extends HiveInspectors { private val defaultFormat = Map( ("TOK_TABLEROWFORMATFIELD", "\t"), @@ -402,52 +419,4 @@ case class HiveScriptIOSchema ( instance } } - - def inputRowFormatSQL: Option[String] = - getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps) - - def outputRowFormatSQL: Option[String] = - getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps) - - /** - * Get the row format specification - * Note: - * 1. Changes are needed when readerClause and writerClause are supported. - * 2. Changes are needed when "ESCAPED BY" is supported. - */ - private def getRowFormatSQL( - rowFormat: Seq[(String, String)], - serdeClass: Option[String], - serdeProps: Seq[(String, String)]): Option[String] = { - if (schemaLess) return Some("") - - val rowFormatDelimited = - rowFormat.map { - case ("TOK_TABLEROWFORMATFIELD", value) => - "FIELDS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATCOLLITEMS", value) => - "COLLECTION ITEMS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATMAPKEYS", value) => - "MAP KEYS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATLINES", value) => - "LINES TERMINATED BY " + value - case ("TOK_TABLEROWFORMATNULL", value) => - "NULL DEFINED AS " + value - case o => return None - } - - val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") - val serdePropsSQL = - if (serdeClass.nonEmpty) { - val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") - if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" - } else { - "" - } - if (rowFormat.nonEmpty) { - Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ")) - } else { - Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL) - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 2bb13996c1..741e3bdd18 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.CacheTableCommand +import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 4c90dbeb1b..e3522567b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand} -import org.apache.spark.sql.hive.execution.HiveNativeCommand +import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand} import org.apache.spark.sql.hive.test.TestHive class HiveDDLCommandSuite extends PlanTest { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index e91870492a..7a6f1ce0d1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.HiveNativeCommand import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.test.TestHiveSingleton diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 994dc4a2d2..77906ef2b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -28,7 +28,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand} +import org.apache.spark.sql.execution.command.{ExplainCommand, HiveNativeCommand, SetCommand} import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable} import org.apache.spark.sql.hive.SQLBuilder |