aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-05-11 19:21:16 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-11 19:21:16 -0700
commite35d878be3b2976333618a356b88440f5e8ba408 (patch)
tree720ad7fa8d2e8e9349f652ec01cbdd02d17128d5 /sql
parentb6bf4f76c78abfaafa99b3c3c08b498aa9644346 (diff)
downloadspark-e35d878be3b2976333618a356b88440f5e8ba408.tar.gz
spark-e35d878be3b2976333618a356b88440f5e8ba408.tar.bz2
spark-e35d878be3b2976333618a356b88440f5e8ba408.zip
[SPARK-7411] [SQL] Support SerDe for HiveQl in CTAS
This is a follow up of #5876 and should be merged after #5876. Let's wait for unit testing result from Jenkins. Author: Cheng Hao <hao.cheng@intel.com> Closes #5963 from chenghao-intel/useIsolatedClient and squashes the following commits: f87ace6 [Cheng Hao] remove the TODO and add `resolved condition` for HiveTable a8260e8 [Cheng Hao] Update code as feedback f4e243f [Cheng Hao] remove the serde setting for SequenceFile d166afa [Cheng Hao] style issue d25a4aa [Cheng Hao] Add SerDe support for CTAS
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala66
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala207
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala177
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala4
6 files changed, 390 insertions, 82 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f5398605bc..bbf48efb24 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -407,64 +407,58 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
- import org.apache.hadoop.hive.ql.Context
- import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}
-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
+ case p: LogicalPlan if p.resolved => p
+ case p @ CreateTableAsSelect(table, child, allowExisting) =>
+ val schema = if (table.schema.size > 0) {
+ table.schema
+ } else {
+ child.output.map {
+ attr => new HiveColumn(
+ attr.name,
+ HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+ }
+ }
+
+ val desc = table.copy(schema = schema)
- case CreateTableAsSelect(desc, child, allowExisting) =>
- if (hive.convertCTAS && !desc.serde.isDefined) {
+ if (hive.convertCTAS && table.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
- if (desc.specifiedDatabase.isDefined) {
+ if (table.specifiedDatabase.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
- "when spark.sql.hive.convertCTAS is set to true.")
+ "when spark.sql.hive.convertCTAS is set to true.")
}
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
desc.name,
- conf.defaultDataSourceName,
+ hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
- execution.CreateTableAsSelect(
- desc.copy(
- specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
- child,
- allowExisting)
- }
-
- case p: LogicalPlan if p.resolved => p
-
- case p @ CreateTableAsSelect(desc, child, allowExisting) =>
- val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name)
-
- if (hive.convertCTAS) {
- if (desc.specifiedDatabase.isDefined) {
- throw new AnalysisException(
- "Cannot specify database name in a CTAS statement " +
- "when spark.sql.hive.convertCTAS is set to true.")
+ val desc = if (table.serde.isEmpty) {
+ // add default serde
+ table.copy(
+ serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+ } else {
+ table
}
- val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
- CreateTableUsingAsSelect(
- tblName,
- conf.defaultDataSourceName,
- temporary = false,
- mode,
- options = Map.empty[String, String],
- child
- )
- } else {
+ val (dbName, tblName) =
+ processDatabaseAndTableName(
+ desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
+
execution.CreateTableAsSelect(
- desc,
+ desc.copy(
+ specifiedDatabase = Some(dbName),
+ name = tblName),
child,
allowExisting)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 04d40bbb2b..2cbb5ca4d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -22,14 +22,15 @@ import java.sql.Date
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Context
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
import org.apache.hadoop.hive.ql.lib.Node
-import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse._
import org.apache.hadoop.hive.ql.plan.PlanUtils
-import org.apache.spark.sql.AnalysisException
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
@@ -62,7 +63,13 @@ case class CreateTableAsSelect(
allowExisting: Boolean) extends UnaryNode with Command {
override def output: Seq[Attribute] = Seq.empty[Attribute]
- override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
+ override lazy val resolved: Boolean =
+ tableDesc.specifiedDatabase.isDefined &&
+ tableDesc.schema.size > 0 &&
+ tableDesc.serde.isDefined &&
+ tableDesc.inputFormat.isDefined &&
+ tableDesc.outputFormat.isDefined &&
+ childrenResolved
}
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
@@ -240,12 +247,23 @@ private[hive] object HiveQl {
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
- val hContext = new Context(new HiveConf())
+ val hContext = new Context(hiveConf)
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
hContext.clear()
node
}
+ /**
+ * Returns the HiveConf
+ */
+ private[this] def hiveConf(): HiveConf = {
+ val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
+ if (ss == null) {
+ new HiveConf()
+ } else {
+ ss.getConf
+ }
+ }
/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
@@ -476,8 +494,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
DropTable(tableName, ifExists.nonEmpty)
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
case Token("TOK_ANALYZE",
- Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
- isNoscan) =>
+ Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
+ isNoscan) =>
// Reference:
// https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
if (partitionSpec.nonEmpty) {
@@ -547,6 +565,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val (
Some(tableNameParts) ::
_ /* likeTable */ ::
+ externalTable ::
Some(query) ::
allowExisting +:
ignores) =
@@ -554,6 +573,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
Seq(
"TOK_TABNAME",
"TOK_LIKETABLE",
+ "EXTERNAL",
"TOK_QUERY",
"TOK_IFNOTEXISTS",
"TOK_TABLECOMMENT",
@@ -576,43 +596,153 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
children)
val (db, tableName) = extractDbNameTableName(tableNameParts)
- var tableDesc =
- HiveTable(
- specifiedDatabase = db,
- name = tableName,
- schema = Seq.empty,
- partitionColumns = Seq.empty,
- properties = Map.empty,
- serdeProperties = Map.empty,
- tableType = ManagedTable,
- location = None,
- inputFormat = None,
- outputFormat = None,
- serde = None)
-
- // TODO: Handle all the cases here...
- children.foreach {
- case Token("TOK_TBLRCFILE", Nil) =>
- import org.apache.hadoop.hive.ql.io.{RCFileInputFormat, RCFileOutputFormat}
+ // TODO add bucket support
+ var tableDesc: HiveTable = HiveTable(
+ specifiedDatabase = db,
+ name = tableName,
+ schema = Seq.empty[HiveColumn],
+ partitionColumns = Seq.empty[HiveColumn],
+ properties = Map[String, String](),
+ serdeProperties = Map[String, String](),
+ tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
+ location = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ viewText = None)
+
+ // default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
+ val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
+ // handle the default format for the storage type abbriviation
+ tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) {
+ tableDesc.copy(
+ inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+ outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
+ } else if ("RCFile".equalsIgnoreCase(defaultStorageType)) {
+ tableDesc.copy(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
+ serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
+ } else if ("ORC".equalsIgnoreCase(defaultStorageType)) {
+ tableDesc.copy(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
+ serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+ } else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) {
+ tableDesc.copy(
+ inputFormat =
+ Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+ outputFormat =
+ Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+ serde =
+ Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+ } else {
+ tableDesc.copy(
+ inputFormat =
+ Option("org.apache.hadoop.mapred.TextInputFormat"),
+ outputFormat =
+ Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+ }
+
+ children.collect {
+ case list @ Token("TOK_TABCOLLIST", _) =>
+ val cols = BaseSemanticAnalyzer.getColumns(list, true)
+ if (cols != null) {
+ tableDesc = tableDesc.copy(
+ schema = cols.map { field =>
+ HiveColumn(field.getName, field.getType, field.getComment)
+ })
+ }
+ case Token("TOK_TABLECOMMENT", child :: Nil) =>
+ val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+ // TODO support the sql text
+ tableDesc = tableDesc.copy(viewText = Option(comment))
+ case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
+ val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
+ if (cols != null) {
+ tableDesc = tableDesc.copy(
+ partitionColumns = cols.map { field =>
+ HiveColumn(field.getName, field.getType, field.getComment)
+ })
+ }
+ case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=>
+ val serdeParams = new java.util.HashMap[String, String]()
+ child match {
+ case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
+ val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText())
+ serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
+ serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
+ if (rowChild2.length > 1) {
+ val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText)
+ serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
+ }
+ case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
+ val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
+ serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
+ case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
+ val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
+ serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
+ case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
+ val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
+ if (!(lineDelim == "\n") && !(lineDelim == "10")) {
+ throw new AnalysisException(
+ SemanticAnalyzer.generateErrorMessage(
+ rowChild,
+ ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg))
+ }
+ serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
+ case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
+ val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
+ // TODO support the nullFormat
+ case _ => assert(false)
+ }
+ tableDesc = tableDesc.copy(
+ serdeProperties = tableDesc.serdeProperties ++ serdeParams)
+ case Token("TOK_TABLELOCATION", child :: Nil) =>
+ var location = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+ location = EximUtil.relativeToAbsolutePath(hiveConf, location)
+ tableDesc = tableDesc.copy(location = Option(location))
+ case Token("TOK_TABLESERIALIZER", child :: Nil) =>
tableDesc = tableDesc.copy(
- outputFormat = Option(classOf[RCFileOutputFormat].getName),
- inputFormat = Option(classOf[RCFileInputFormat[_, _]].getName))
+ serde = Option(BaseSemanticAnalyzer.unescapeSQLString(child.getChild(0).getText)))
+ if (child.getChildCount == 2) {
+ val serdeParams = new java.util.HashMap[String, String]()
+ BaseSemanticAnalyzer.readProps(
+ (child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams)
+ tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
+ }
+ case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
+ throw new SemanticException(
+ "Unrecognized file format in STORED AS clause:${child.getText}")
+ case Token("TOK_TBLRCFILE", Nil) =>
+ tableDesc = tableDesc.copy(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
if (tableDesc.serde.isEmpty) {
tableDesc = tableDesc.copy(
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
}
+
case Token("TOK_TBLORCFILE", Nil) =>
tableDesc = tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
- serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
+ if (tableDesc.serde.isEmpty) {
+ tableDesc = tableDesc.copy(
+ serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+ }
case Token("TOK_TBLPARQUETFILE", Nil) =>
tableDesc = tableDesc.copy(
- inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
- serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+ inputFormat =
+ Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+ outputFormat =
+ Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+ if (tableDesc.serde.isEmpty) {
+ tableDesc = tableDesc.copy(
+ serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+ }
case Token("TOK_TABLESERIALIZER",
Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
@@ -627,13 +757,20 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
-
- case _ =>
+ case list @ Token("TOK_TABLEFILEFORMAT", _) =>
+ tableDesc = tableDesc.copy(
+ inputFormat =
+ Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(0).getText)),
+ outputFormat =
+ Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(1).getText)))
+ case Token("TOK_STORAGEHANDLER", _) =>
+ throw new AnalysisException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg())
+ case _ => // Unsupport features
}
CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)
- // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
+ // If its not a "CTAS" like above then take it as a native command
case Token("TOK_CREATETABLE", _) => NativePlaceholder
// Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 6bca9d0179..99aa0f1ded 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -225,6 +225,12 @@ private[hive] class ClientWrapper(
table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
+
+ // set owner
+ qlTable.setOwner(conf.getUser)
+ // set create time
+ qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
version match {
case hive.v12 =>
table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 91e6ac4032..7d3ec12c4e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql.hive.execution
-import org.apache.hadoop.hive.ql.plan.CreateTableDesc
-
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
@@ -29,13 +27,10 @@ import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, HiveMetastoreT
/**
* Create table and insert the query result into it.
- * @param database the database name of the new relation
- * @param tableName the table name of the new relation
+ * @param tableDesc the Table Describe, which may contains serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param allowExisting allow continue working if it's already exists, otherwise
* raise exception
- * @param desc the CreateTableDesc, which may contains serde, storage handler etc.
-
*/
private[hive]
case class CreateTableAsSelect(
@@ -80,8 +75,7 @@ case class CreateTableAsSelect(
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
- throw
- new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
+ throw new AnalysisException(s"$database.$tableName already exists.")
}
} else {
hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
new file mode 100644
index 0000000000..941a294164
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.hive.client.{ManagedTable, HiveColumn, ExternalTable, HiveTable}
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+
+class HiveQlSuite extends FunSuite with BeforeAndAfterAll {
+ override def beforeAll() {
+ if (SessionState.get() == null) {
+ SessionState.start(new HiveConf())
+ }
+ }
+
+ private def extractTableDesc(sql: String): (HiveTable, Boolean) = {
+ HiveQl.createPlan(sql).collect {
+ case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting)
+ }.head
+ }
+
+ test("Test CTAS #1") {
+ val s1 =
+ """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
+ |(viewTime INT,
+ |userid BIGINT,
+ |page_url STRING,
+ |referrer_url STRING,
+ |ip STRING COMMENT 'IP Address of the User',
+ |country STRING COMMENT 'country of origination')
+ |COMMENT 'This is the staging page view table'
+ |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
+ |LOCATION '/user/external/page_view'
+ |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+ |AS SELECT * FROM src""".stripMargin
+
+ val (desc, exists) = extractTableDesc(s1)
+ assert(exists == true)
+ assert(desc.specifiedDatabase == Some("mydb"))
+ assert(desc.name == "page_view")
+ assert(desc.tableType == ExternalTable)
+ assert(desc.location == Some("/user/external/page_view"))
+ assert(desc.schema ==
+ HiveColumn("viewtime", "int", null) ::
+ HiveColumn("userid", "bigint", null) ::
+ HiveColumn("page_url", "string", null) ::
+ HiveColumn("referrer_url", "string", null) ::
+ HiveColumn("ip", "string", "IP Address of the User") ::
+ HiveColumn("country", "string", "country of origination") :: Nil)
+ // TODO will be SQLText
+ assert(desc.viewText == Option("This is the staging page view table"))
+ assert(desc.partitionColumns ==
+ HiveColumn("dt", "string", "date type") ::
+ HiveColumn("hour", "string", "hour of the day") :: Nil)
+ assert(desc.serdeProperties ==
+ Map((serdeConstants.SERIALIZATION_FORMAT, "\054"), (serdeConstants.FIELD_DELIM, "\054")))
+ assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+ assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+ assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+ assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
+ }
+
+ test("Test CTAS #2") {
+ val s2 =
+ """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
+ |(viewTime INT,
+ |userid BIGINT,
+ |page_url STRING,
+ |referrer_url STRING,
+ |ip STRING COMMENT 'IP Address of the User',
+ |country STRING COMMENT 'country of origination')
+ |COMMENT 'This is the staging page view table'
+ |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
+ |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
+ | STORED AS
+ | INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
+ | OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
+ |LOCATION '/user/external/page_view'
+ |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+ |AS SELECT * FROM src""".stripMargin
+
+ val (desc, exists) = extractTableDesc(s2)
+ assert(exists == true)
+ assert(desc.specifiedDatabase == Some("mydb"))
+ assert(desc.name == "page_view")
+ assert(desc.tableType == ExternalTable)
+ assert(desc.location == Some("/user/external/page_view"))
+ assert(desc.schema ==
+ HiveColumn("viewtime", "int", null) ::
+ HiveColumn("userid", "bigint", null) ::
+ HiveColumn("page_url", "string", null) ::
+ HiveColumn("referrer_url", "string", null) ::
+ HiveColumn("ip", "string", "IP Address of the User") ::
+ HiveColumn("country", "string", "country of origination") :: Nil)
+ // TODO will be SQLText
+ assert(desc.viewText == Option("This is the staging page view table"))
+ assert(desc.partitionColumns ==
+ HiveColumn("dt", "string", "date type") ::
+ HiveColumn("hour", "string", "hour of the day") :: Nil)
+ assert(desc.serdeProperties == Map())
+ assert(desc.inputFormat == Option("parquet.hive.DeprecatedParquetInputFormat"))
+ assert(desc.outputFormat == Option("parquet.hive.DeprecatedParquetOutputFormat"))
+ assert(desc.serde == Option("parquet.hive.serde.ParquetHiveSerDe"))
+ assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
+ }
+
+ test("Test CTAS #3") {
+ val s3 = """CREATE TABLE page_view AS SELECT * FROM src"""
+ val (desc, exists) = extractTableDesc(s3)
+ assert(exists == false)
+ assert(desc.specifiedDatabase == None)
+ assert(desc.name == "page_view")
+ assert(desc.tableType == ManagedTable)
+ assert(desc.location == None)
+ assert(desc.schema == Seq.empty[HiveColumn])
+ assert(desc.viewText == None) // TODO will be SQLText
+ assert(desc.serdeProperties == Map())
+ assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
+ assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+ assert(desc.serde.isEmpty)
+ assert(desc.properties == Map())
+ }
+
+ test("Test CTAS #4") {
+ val s4 =
+ """CREATE TABLE page_view
+ |STORED BY 'storage.handler.class.name' AS SELECT * FROM src""".stripMargin
+ intercept[AnalysisException] {
+ extractTableDesc(s4)
+ }
+ }
+
+ test("Test CTAS #5") {
+ val s5 = """CREATE TABLE ctas2
+ | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"
+ | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2")
+ | STORED AS RCFile
+ | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22")
+ | AS
+ | SELECT key, value
+ | FROM src
+ | ORDER BY key, value""".stripMargin
+ val (desc, exists) = extractTableDesc(s5)
+ assert(exists == false)
+ assert(desc.specifiedDatabase == None)
+ assert(desc.name == "ctas2")
+ assert(desc.tableType == ManagedTable)
+ assert(desc.location == None)
+ assert(desc.schema == Seq.empty[HiveColumn])
+ assert(desc.viewText == None) // TODO will be SQLText
+ assert(desc.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
+ assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+ assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+ assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
+ assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1353802604..0d739dead4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -201,7 +201,7 @@ class SQLQuerySuite extends QueryTest {
var message = intercept[AnalysisException] {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
}.getMessage
- assert(message.contains("Table ctas1 already exists"))
+ assert(message.contains("ctas1 already exists"))
checkRelation("ctas1", true)
sql("DROP TABLE ctas1")
@@ -314,7 +314,7 @@ class SQLQuerySuite extends QueryTest {
SELECT key, value
FROM src
ORDER BY key, value""").collect().toSeq)
- intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
+ intercept[AnalysisException] {
sql(
"""CREATE TABLE ctas4 AS
| SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect()