aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main/scala/org
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-13 11:08:34 -0700
committerYin Huai <yhuai@databricks.com>2016-04-13 11:08:34 -0700
commit7d2ed8cc030f3d84fea47fded072c320c3d87ca7 (patch)
treec85aedad5a7fb97ca4b02f1c8f81983f89b90f97 /sql/hive/src/main/scala/org
parent1018a1c1eb33eefbfb9025fac7a1cdafc5cbf8f8 (diff)
downloadspark-7d2ed8cc030f3d84fea47fded072c320c3d87ca7.tar.gz
spark-7d2ed8cc030f3d84fea47fded072c320c3d87ca7.tar.bz2
spark-7d2ed8cc030f3d84fea47fded072c320c3d87ca7.zip
[SPARK-14388][SQL] Implement CREATE TABLE
## What changes were proposed in this pull request? This patch implements the `CREATE TABLE` command using the `SessionCatalog`. Previously we handled only `CTAS` and `CREATE TABLE ... USING`. This requires us to refactor `CatalogTable` to accept various fields (e.g. bucket and skew columns) and pass them to Hive. WIP: Note that I haven't verified whether this actually works yet! But I believe it does. ## How was this patch tested? Tests will come in a future commit. Author: Andrew Or <andrew@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12271 from andrewor14/create-table-ddl.
Diffstat (limited to 'sql/hive/src/main/scala/org')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala44
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala223
3 files changed, 179 insertions, 104 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 14f331961e..ccc8345d73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -91,7 +91,7 @@ private[hive] object HiveSerDe {
"textfile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
"avro" ->
HiveSerDe(
@@ -905,8 +905,13 @@ private[hive] case class MetastoreRelation(
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tTable.setSd(sd)
- sd.setCols(table.schema.map(toHiveColumn).asJava)
- tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava)
+
+ // Note: In Hive the schema and partition columns must be disjoint sets
+ val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
+ table.partitionColumnNames.contains(c.getName)
+ }
+ sd.setCols(schema.asJava)
+ tTable.setPartitionKeys(partCols.asJava)
table.storage.locationUri.foreach(sd.setLocation)
table.storage.inputFormat.foreach(sd.setInputFormat)
@@ -1013,7 +1018,10 @@ private[hive] case class MetastoreRelation(
val partitionKeys = table.partitionColumns.map(_.toAttribute)
/** Non-partitionKey attributes */
- val attributes = table.schema.map(_.toAttribute)
+ // TODO: just make this hold the schema itself, not just non-partition columns
+ val attributes = table.schema
+ .filter { c => !table.partitionColumnNames.contains(c.name) }
+ .map(_.toAttribute)
val output = attributes ++ partitionKeys
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 39e26acd7f..2a1fff92b5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -299,6 +299,10 @@ private[hive] class HiveClientImpl(
tableName: String): Option[CatalogTable] = withHiveState {
logDebug(s"Looking up $dbName.$tableName")
Option(client.getTable(dbName, tableName, false)).map { h =>
+ // Note: Hive separates partition columns and the schema, but for us the
+ // partition columns are part of the schema
+ val partCols = h.getPartCols.asScala.map(fromHiveColumn)
+ val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
@@ -307,9 +311,10 @@ private[hive] class HiveClientImpl(
case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE
case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW
},
- schema = h.getCols.asScala.map(fromHiveColumn),
- partitionColumns = h.getPartCols.asScala.map(fromHiveColumn),
- sortColumns = Seq(),
+ schema = schema,
+ partitionColumnNames = partCols.map(_.name),
+ sortColumnNames = Seq(), // TODO: populate this
+ bucketColumnNames = h.getBucketCols.asScala,
numBuckets = h.getNumBuckets,
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
@@ -675,24 +680,37 @@ private[hive] class HiveClientImpl(
private def toHiveTable(table: CatalogTable): HiveTable = {
val hiveTable = new HiveTable(table.database, table.identifier.table)
- // For EXTERNAL_TABLE/MANAGED_TABLE, we also need to set EXTERNAL field in
- // the table properties accodringly. Otherwise, if EXTERNAL_TABLE is the table type
- // but EXTERNAL field is not set, Hive metastore will change the type to
- // MANAGED_TABLE (see
- // metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
+ // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties.
+ // Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
+ // (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
hiveTable.setTableType(table.tableType match {
case CatalogTableType.EXTERNAL_TABLE =>
hiveTable.setProperty("EXTERNAL", "TRUE")
HiveTableType.EXTERNAL_TABLE
case CatalogTableType.MANAGED_TABLE =>
- hiveTable.setProperty("EXTERNAL", "FALSE")
HiveTableType.MANAGED_TABLE
case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE
case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW
})
- hiveTable.setFields(table.schema.map(toHiveColumn).asJava)
- hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava)
+ // Note: In Hive the schema and partition columns must be disjoint sets
+ val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
+ table.partitionColumnNames.contains(c.getName)
+ }
+ if (table.schema.isEmpty) {
+ // This is a hack to preserve existing behavior. Before Spark 2.0, we do not
+ // set a default serde here (this was done in Hive), and so if the user provides
+ // an empty schema Hive would automatically populate the schema with a single
+ // field "col". However, after SPARK-14388, we set the default serde to
+ // LazySimpleSerde so this implicit behavior no longer happens. Therefore,
+ // we need to do it in Spark ourselves.
+ hiveTable.setFields(
+ Seq(new FieldSchema("col", "array<string>", "from deserializer")).asJava)
+ } else {
+ hiveTable.setFields(schema.asJava)
+ }
+ hiveTable.setPartCols(partCols.asJava)
// TODO: set sort columns here too
+ hiveTable.setBucketCols(table.bucketColumnNames.asJava)
hiveTable.setOwner(conf.getUser)
hiveTable.setNumBuckets(table.numBuckets)
hiveTable.setCreateTime((table.createTime / 1000).toInt)
@@ -700,9 +718,11 @@ private[hive] class HiveClientImpl(
table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
- table.storage.serde.foreach(hiveTable.setSerializationLib)
+ hiveTable.setSerializationLib(
+ table.storage.serde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) }
table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) }
+ table.comment.foreach { c => hiveTable.setProperty("comment", c) }
table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) }
table.viewText.foreach { t => hiveTable.setViewExpandedText(t) }
hiveTable
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 7a435117e7..b14db7fe71 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
@@ -33,8 +34,9 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.CreateTable
import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView}
-import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveSerDe}
+import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveMetastoreTypes, HiveSerDe}
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
/**
@@ -121,84 +123,116 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
}
/**
- * Create a [[CatalogStorageFormat]]. This is part of the [[CreateTableAsSelect]] command.
+ * Create a [[CatalogStorageFormat]] for creating tables.
*/
override def visitCreateFileFormat(
ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
- if (ctx.storageHandler == null) {
- typedVisit[CatalogStorageFormat](ctx.fileFormat)
- } else {
- visitStorageHandler(ctx.storageHandler)
+ (ctx.fileFormat, ctx.storageHandler) match {
+ // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
+ case (c: TableFileFormatContext, null) =>
+ visitTableFileFormat(c)
+ // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
+ case (c: GenericFileFormatContext, null) =>
+ visitGenericFileFormat(c)
+ case (null, storageHandler) =>
+ throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx)
+ case _ =>
+ throw new ParseException("expected either STORED AS or STORED BY, not both", ctx)
}
}
/**
- * Create a [[CreateTableAsSelect]] command.
+ * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelect]].
+ *
+ * This is not used to create datasource tables, which is handled through
+ * "CREATE TABLE ... USING ...".
+ *
+ * Note: several features are currently not supported - temporary tables, bucketing,
+ * skewed columns and storage handlers (STORED BY).
+ *
+ * Expected format:
+ * {{{
+ * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+ * [(col1 data_type [COMMENT col_comment], ...)]
+ * [COMMENT table_comment]
+ * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
+ * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
+ * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
+ * [ROW FORMAT row_format]
+ * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
+ * [LOCATION path]
+ * [TBLPROPERTIES (property_name=property_value, ...)]
+ * [AS select_statement];
+ * }}}
*/
- override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
- if (ctx.query == null) {
- HiveNativeCommand(command(ctx))
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
+ val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+ // TODO: implement temporary tables
+ if (temp) {
+ throw new ParseException(
+ "CREATE TEMPORARY TABLE is not supported yet. " +
+ "Please use registerTempTable as an alternative.", ctx)
+ }
+ if (ctx.skewSpec != null) {
+ throw new ParseException("Operation not allowed: CREATE TABLE ... SKEWED BY ...", ctx)
+ }
+ if (ctx.bucketSpec != null) {
+ throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx)
+ }
+ val tableType = if (external) {
+ CatalogTableType.EXTERNAL_TABLE
} else {
- // Get the table header.
- val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
- val tableType = if (external) {
- CatalogTableType.EXTERNAL_TABLE
- } else {
- CatalogTableType.MANAGED_TABLE
- }
-
- // Unsupported clauses.
- if (temp) {
- throw new ParseException(s"Unsupported operation: TEMPORARY clause.", ctx)
- }
- if (ctx.bucketSpec != null) {
- // TODO add this - we need cluster columns in the CatalogTable for this to work.
- throw new ParseException("Unsupported operation: " +
- "CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS clause.", ctx)
- }
- if (ctx.skewSpec != null) {
- throw new ParseException("Operation not allowed: " +
- "SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause.", ctx)
- }
-
- // Create the schema.
- val schema = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns(_, _.toLowerCase))
-
- // Get the column by which the table is partitioned.
- val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns(_))
-
- // Create the storage.
- def format(fmt: ParserRuleContext): CatalogStorageFormat = {
- Option(fmt).map(typedVisit[CatalogStorageFormat]).getOrElse(EmptyStorageFormat)
- }
- // Default storage.
+ CatalogTableType.MANAGED_TABLE
+ }
+ val comment = Option(ctx.STRING).map(string)
+ val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
+ val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+ val properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
+ val selectQuery = Option(ctx.query).map(plan)
+
+ // Note: Hive requires partition columns to be distinct from the schema, so we need
+ // to include the partition columns here explicitly
+ val schema = cols ++ partitionCols
+
+ // Storage format
+ val defaultStorage: CatalogStorageFormat = {
val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
- val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse {
- HiveSerDe(
- inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
- }
- // Defined storage.
- val fileStorage = format(ctx.createFileFormat)
- val rowStorage = format(ctx.rowFormat)
- val storage = CatalogStorageFormat(
- Option(ctx.locationSpec).map(visitLocationSpec),
- fileStorage.inputFormat.orElse(hiveSerDe.inputFormat),
- fileStorage.outputFormat.orElse(hiveSerDe.outputFormat),
- rowStorage.serde.orElse(hiveSerDe.serde).orElse(fileStorage.serde),
- rowStorage.serdeProperties ++ fileStorage.serdeProperties
- )
+ val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf)
+ CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
+ .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
+ outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
+ .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+ // Note: Keep this unspecified because we use the presence of the serde to decide
+ // whether to convert a table created by CTAS to a datasource table.
+ serde = None,
+ serdeProperties = Map())
+ }
+ val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
+ .getOrElse(EmptyStorageFormat)
+ val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
+ val location = Option(ctx.locationSpec).map(visitLocationSpec)
+ val storage = CatalogStorageFormat(
+ locationUri = location,
+ inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
+ outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
+ serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
+ serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties)
+
+ // TODO support the sql text - have a proper location for this!
+ val tableDesc = CatalogTable(
+ identifier = name,
+ tableType = tableType,
+ storage = storage,
+ schema = schema,
+ partitionColumnNames = partitionCols.map(_.name),
+ properties = properties,
+ comment = comment)
- val tableDesc = CatalogTable(
- identifier = table,
- tableType = tableType,
- schema = schema,
- partitionColumns = partitionCols,
- storage = storage,
- properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
- // TODO support the sql text - have a proper location for this!
- viewText = Option(ctx.STRING).map(string))
- CTAS(tableDesc, plan(ctx.query), ifNotExists)
+ selectQuery match {
+ case Some(q) => CTAS(tableDesc, q, ifNotExists)
+ case None => CreateTable(tableDesc, ifNotExists)
}
}
@@ -353,25 +387,19 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty)
/**
- * Create a [[CatalogStorageFormat]]. The INPUTDRIVER and OUTPUTDRIVER clauses are currently
- * ignored.
+ * Create a [[CatalogStorageFormat]].
*/
override def visitTableFileFormat(
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
- import ctx._
- if (inDriver != null || outDriver != null) {
- throw new ParseException(
- s"Operation not allowed: INPUTDRIVER ... OUTPUTDRIVER ... clauses", ctx)
- }
EmptyStorageFormat.copy(
- inputFormat = Option(string(inFmt)),
- outputFormat = Option(string(outFmt)),
- serde = Option(serdeCls).map(string)
+ inputFormat = Option(string(ctx.inFmt)),
+ outputFormat = Option(string(ctx.outFmt)),
+ serde = Option(ctx.serdeCls).map(string)
)
}
/**
- * Resolve a [[HiveSerDe]] based on the format name given.
+ * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]].
*/
override def visitGenericFileFormat(
ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
@@ -388,11 +416,28 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
}
/**
- * Storage Handlers are currently not supported in the statements we support (CTAS).
+ * Create a [[RowFormat]] used for creating tables.
+ *
+ * Example format:
+ * {{{
+ * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
+ * }}}
+ *
+ * OR
+ *
+ * {{{
+ * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
+ * [COLLECTION ITEMS TERMINATED BY char]
+ * [MAP KEYS TERMINATED BY char]
+ * [LINES TERMINATED BY char]
+ * [NULL DEFINED AS char]
+ * }}}
*/
- override def visitStorageHandler(
- ctx: StorageHandlerContext): CatalogStorageFormat = withOrigin(ctx) {
- throw new ParseException("Storage Handlers are currently unsupported.", ctx)
+ private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) {
+ ctx match {
+ case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
+ case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
+ }
}
/**
@@ -435,13 +480,15 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
/**
* Create a sequence of [[CatalogColumn]]s from a column list
*/
- private def visitCatalogColumns(
- ctx: ColTypeListContext,
- formatter: String => String = identity): Seq[CatalogColumn] = withOrigin(ctx) {
+ private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
ctx.colType.asScala.map { col =>
CatalogColumn(
- formatter(col.identifier.getText),
- col.dataType.getText.toLowerCase, // TODO validate this?
+ col.identifier.getText.toLowerCase,
+ // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
+ // just convert the whole type string to lower case, otherwise the struct field names
+ // will no longer be case sensitive. Instead, we rely on our parser to get the proper
+ // case before passing it to Hive.
+ CatalystSqlParser.parseDataType(col.dataType.getText).simpleString,
nullable = true,
Option(col.STRING).map(string))
}