aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-05-23 11:55:03 -0700
committerAndrew Or <andrew@databricks.com>2016-05-23 11:55:03 -0700
commit2585d2b322f3b6b85a0a12ddf7dcde957453000d (patch)
tree3346618e5e970969f4cc5acfa46787f0c46139c0
parent07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc (diff)
downloadspark-2585d2b322f3b6b85a0a12ddf7dcde957453000d.tar.gz
spark-2585d2b322f3b6b85a0a12ddf7dcde957453000d.tar.bz2
spark-2585d2b322f3b6b85a0a12ddf7dcde957453000d.zip
[SPARK-15279][SQL] Catch conflicting SerDe when creating table
## What changes were proposed in this pull request? The user may do something like: ``` CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS PARQUET CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS ... SERDE 'myserde' CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ORC CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ... SERDE 'myserde' ``` None of these should be allowed because the SerDe's conflict. As of this patch: - `ROW FORMAT DELIMITED` is only compatible with `TEXTFILE` - `ROW FORMAT SERDE` is only compatible with `TEXTFILE`, `RCFILE` and `SEQUENCEFILE` ## How was this patch tested? New tests in `DDLCommandSuite`. Author: Andrew Or <andrew@databricks.com> Closes #13068 from andrewor14/row-format-conflict.
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala60
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala94
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala4
4 files changed, 129 insertions, 33 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 848c59e3b8..8ea8f76629 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -267,8 +267,8 @@ createFileFormat
;
fileFormat
- : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat
- | identifier #genericFileFormat
+ : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING #tableFileFormat
+ | identifier #genericFileFormat
;
storageHandler
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index c517b8b55f..6e4af9500c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -796,14 +796,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*
* Expected format:
* {{{
- * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
- * [(col1 data_type [COMMENT col_comment], ...)]
+ * CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+ * [(col1[:] data_type [COMMENT col_comment], ...)]
* [COMMENT table_comment]
- * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
- * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
- * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
+ * [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
- * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
+ * [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* [AS select_statement];
@@ -849,6 +847,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
compressed = false,
serdeProperties = Map())
}
+ validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(EmptyStorageFormat)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
@@ -905,6 +904,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
/**
* Create a [[CatalogStorageFormat]] for creating tables.
+ *
+ * Format: STORED AS ...
*/
override def visitCreateFileFormat(
ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
@@ -932,9 +933,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
EmptyStorageFormat.copy(
inputFormat = Option(string(ctx.inFmt)),
- outputFormat = Option(string(ctx.outFmt)),
- serde = Option(ctx.serdeCls).map(string)
- )
+ outputFormat = Option(string(ctx.outFmt)))
}
/**
@@ -1019,6 +1018,49 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT
+ * and STORED AS.
+ *
+ * The following are allowed. Anything else is not:
+ * ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
+ * ROW FORMAT DELIMITED ... STORED AS TEXTFILE
+ * ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
+ */
+ private def validateRowFormatFileFormat(
+ rowFormatCtx: RowFormatContext,
+ createFileFormatCtx: CreateFileFormatContext,
+ parentCtx: ParserRuleContext): Unit = {
+ if (rowFormatCtx == null || createFileFormatCtx == null) {
+ return
+ }
+ (rowFormatCtx, createFileFormatCtx.fileFormat) match {
+ case (_, ffTable: TableFileFormatContext) => // OK
+ case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) =>
+ ffGeneric.identifier.getText.toLowerCase match {
+ case ("sequencefile" | "textfile" | "rcfile") => // OK
+ case fmt =>
+ throw operationNotAllowed(
+ s"ROW FORMAT SERDE is incompatible with format '$fmt', which also specifies a serde",
+ parentCtx)
+ }
+ case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) =>
+ ffGeneric.identifier.getText.toLowerCase match {
+ case "textfile" => // OK
+ case fmt => throw operationNotAllowed(
+ s"ROW FORMAT DELIMITED is only compatible with 'textfile', not '$fmt'", parentCtx)
+ }
+ case _ =>
+ // should never happen
+ def str(ctx: ParserRuleContext): String = {
+ (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ")
+ }
+ throw operationNotAllowed(
+ s"Unexpected combination of ${str(rowFormatCtx)} and ${str(createFileFormatCtx)}",
+ parentCtx)
+ }
+ }
+
+ /**
* Create or replace a view. This creates a [[CreateViewCommand]] command.
*
* For example:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 54f98a6232..eab1f55712 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.command
+import scala.reflect.{classTag, ClassTag}
+
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
@@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
// TODO: merge this with DDLSuite (SPARK-14441)
class DDLCommandSuite extends PlanTest {
private val parser = new SparkSqlParser(new SQLConf)
@@ -40,6 +43,15 @@ class DDLCommandSuite extends PlanTest {
containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) }
}
+ private def parseAs[T: ClassTag](query: String): T = {
+ parser.parsePlan(query) match {
+ case t: T => t
+ case other =>
+ fail(s"Expected to parse ${classTag[T].runtimeClass} from query," +
+ s"got ${other.getClass.getName}: $query")
+ }
+ }
+
test("create database") {
val sql =
"""
@@ -225,19 +237,69 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed4, expected4)
}
+ test("create table - row format and table file format") {
+ val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
+ val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'"
+ val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
+ val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"
+
+ // No conflicting serdes here, OK
+ val parsed1 = parseAs[CreateTableCommand](query1)
+ assert(parsed1.table.storage.serde == Some("anything"))
+ assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
+ assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
+ val parsed2 = parseAs[CreateTableCommand](query2)
+ assert(parsed2.table.storage.serde.isEmpty)
+ assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
+ assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
+ }
+
+ test("create table - row format serde and generic file format") {
+ val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
+ val supportedSources = Set("sequencefile", "rcfile", "textfile")
+
+ allSources.foreach { s =>
+ val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
+ if (supportedSources.contains(s)) {
+ val ct = parseAs[CreateTableCommand](query)
+ val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+ assert(hiveSerde.isDefined)
+ assert(ct.table.storage.serde == Some("anything"))
+ assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
+ assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+ } else {
+ assertUnsupported(query, Seq("row format serde", "incompatible", s))
+ }
+ }
+ }
+
+ test("create table - row format delimited and generic file format") {
+ val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
+ val supportedSources = Set("textfile")
+
+ allSources.foreach { s =>
+ val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
+ if (supportedSources.contains(s)) {
+ val ct = parseAs[CreateTableCommand](query)
+ val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+ assert(hiveSerde.isDefined)
+ assert(ct.table.storage.serde == hiveSerde.get.serde)
+ assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
+ assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+ } else {
+ assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s))
+ }
+ }
+ }
+
test("create external table - location must be specified") {
assertUnsupported(
sql = "CREATE EXTERNAL TABLE my_tab",
containsThesePhrases = Seq("create external table", "location"))
val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
- parser.parsePlan(query) match {
- case ct: CreateTableCommand =>
- assert(ct.table.tableType == CatalogTableType.EXTERNAL)
- assert(ct.table.storage.locationUri == Some("/something/anything"))
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $query")
- }
+ val ct = parseAs[CreateTableCommand](query)
+ assert(ct.table.tableType == CatalogTableType.EXTERNAL)
+ assert(ct.table.storage.locationUri == Some("/something/anything"))
}
test("create table - property values must be set") {
@@ -252,14 +314,9 @@ class DDLCommandSuite extends PlanTest {
test("create table - location implies external") {
val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
- parser.parsePlan(query) match {
- case ct: CreateTableCommand =>
- assert(ct.table.tableType == CatalogTableType.EXTERNAL)
- assert(ct.table.storage.locationUri == Some("/something/anything"))
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $query")
- }
+ val ct = parseAs[CreateTableCommand](query)
+ assert(ct.table.tableType == CatalogTableType.EXTERNAL)
+ assert(ct.table.storage.locationUri == Some("/something/anything"))
}
test("create table using - with partitioned by") {
@@ -551,8 +608,7 @@ class DDLCommandSuite extends PlanTest {
test("alter table: set file format (not allowed)") {
assertUnsupported(
- "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
- "OUTPUTFORMAT 'test' SERDE 'test'")
+ "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' OUTPUTFORMAT 'test'")
assertUnsupported(
"ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
"SET FILEFORMAT PARQUET")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 30ad392969..96c8fa6b70 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest {
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
- |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
+ |STORED AS RCFILE
|LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
|AS SELECT * FROM src""".stripMargin
@@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.partitionColumns ==
CatalogColumn("dt", "string", comment = Some("date type")) ::
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
- assert(desc.storage.serdeProperties ==
- Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), (serdeConstants.FIELD_DELIM, "\u002C")))
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==