aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-10-28 14:36:06 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-28 14:36:06 -0700
commit4b55482abf899c27da3d55401ad26b4e9247b327 (patch)
tree53d9bff976ad43beff49a7b20e010f0de0cee793 /sql/hive
parentabcafcfba38d7c8dba68a5510475c5c49ae54d92 (diff)
downloadspark-4b55482abf899c27da3d55401ad26b4e9247b327.tar.gz
spark-4b55482abf899c27da3d55401ad26b4e9247b327.tar.bz2
spark-4b55482abf899c27da3d55401ad26b4e9247b327.zip
[SPARK-3343] [SQL] Add serde support for CTAS
Currently, `CTAS` (Create Table As Select) doesn't support specifying the `SerDe` in HQL. This PR will pass down the `ASTNode` into the physical operator `execution.CreateTableAsSelect`, which will extract the `CreateTableDesc` object via Hive `SemanticAnalyzer`. In the meantime, I also update the `HiveMetastoreCatalog.createTable` to optionally support the `CreateTableDesc` for table creation. Author: Cheng Hao <hao.cheng@intel.com> Closes #2570 from chenghao-intel/ctas_serde and squashes the following commits: e011ef5 [Cheng Hao] shim for both 0.12 & 0.13.1 cfb3662 [Cheng Hao] revert to hive 0.12 c8a547d [Cheng Hao] Support SerDe properties within CTAS
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala197
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala39
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala19
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala37
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala59
-rw-r--r--sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala5
-rw-r--r--sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala6
10 files changed, 311 insertions, 89 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 4fc26d6f55..26d9ca05c8 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -229,7 +229,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// Needs constant object inspectors
"udf_round",
- "udf7"
+ "udf7",
+
+ // Sort with Limit clause causes failure.
+ "ctas",
+ "ctas_hadoop20"
) ++ HiveShim.compatibilityBlackList
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 39d87a9d14..2dd2c882a8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,17 +17,27 @@
package org.apache.spark.sql.hive
+import java.io.IOException
+import java.util.{List => JList}
+
import scala.util.parsing.combinator.RegexParsers
-import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition => TPartition, Table => TTable}
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.hadoop.hive.serde2.Deserializer
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.fs.Path
+
+import org.apache.hadoop.hive.metastore.TableType
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
+import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
+import org.apache.hadoop.hive.ql.plan.{TableDesc, CreateTableDesc}
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.analysis.Catalog
+import org.apache.spark.sql.catalyst.analysis.{Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
@@ -66,37 +76,164 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
table.getTTable, partitions.map(part => part.getTPartition))(hive)
}
+ /**
+ * Create table with specified database, table name, table description and schema
+ * @param databaseName Database Name
+ * @param tableName Table Name
+ * @param schema Schema of the new table, if not specified, will use the schema
+ * specified in crtTbl
+ * @param allowExisting if true, ignore AlreadyExistsException
+ * @param desc CreateTableDesc object which contains the SerDe info. Currently
+ * we support most of the features except the bucket.
+ */
def createTable(
databaseName: String,
tableName: String,
schema: Seq[Attribute],
- allowExisting: Boolean = false): Unit = {
+ allowExisting: Boolean = false,
+ desc: Option[CreateTableDesc] = None) {
+ val hconf = hive.hiveconf
+
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- val table = new Table(dbName, tblName)
- val hiveSchema =
+ val tbl = new Table(dbName, tblName)
+
+ val crtTbl: CreateTableDesc = desc.getOrElse(null)
+
+ // We should respect the passed in schema, unless it's not set
+ val hiveSchema: JList[FieldSchema] = if (schema == null || schema.isEmpty) {
+ crtTbl.getCols
+ } else {
schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
- table.setFields(hiveSchema)
-
- val sd = new StorageDescriptor()
- table.getTTable.setSd(sd)
- sd.setCols(hiveSchema)
-
- // TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs.
- sd.setCompressed(false)
- sd.setParameters(Map[String, String]())
- sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
- sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
- val serDeInfo = new SerDeInfo()
- serDeInfo.setName(tblName)
- serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
- serDeInfo.setParameters(Map[String, String]())
- sd.setSerdeInfo(serDeInfo)
+ }
+ tbl.setFields(hiveSchema)
+
+ // Most of code are similar with the DDLTask.createTable() of Hive,
+ if (crtTbl != null && crtTbl.getTblProps() != null) {
+ tbl.getTTable().getParameters().putAll(crtTbl.getTblProps())
+ }
+
+ if (crtTbl != null && crtTbl.getPartCols() != null) {
+ tbl.setPartCols(crtTbl.getPartCols())
+ }
+
+ if (crtTbl != null && crtTbl.getStorageHandler() != null) {
+ tbl.setProperty(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
+ crtTbl.getStorageHandler())
+ }
+
+ /*
+ * We use LazySimpleSerDe by default.
+ *
+ * If the user didn't specify a SerDe, and any of the columns are not simple
+ * types, we will have to use DynamicSerDe instead.
+ */
+ if (crtTbl == null || crtTbl.getSerName() == null) {
+ val storageHandler = tbl.getStorageHandler()
+ if (storageHandler == null) {
+ logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName")
+ tbl.setSerializationLib(classOf[LazySimpleSerDe].getName())
+
+ import org.apache.hadoop.mapred.TextInputFormat
+ import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ import org.apache.hadoop.io.Text
+
+ tbl.setInputFormatClass(classOf[TextInputFormat])
+ tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]])
+ tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+ } else {
+ val serDeClassName = storageHandler.getSerDeClass().getName()
+ logInfo(s"Use StorageHandler-supplied $serDeClassName for table $dbName.$tblName")
+ tbl.setSerializationLib(serDeClassName)
+ }
+ } else {
+ // let's validate that the serde exists
+ val serdeName = crtTbl.getSerName()
+ try {
+ val d = ReflectionUtils.newInstance(hconf.getClassByName(serdeName), hconf)
+ if (d != null) {
+ logDebug("Found class for $serdeName")
+ }
+ } catch {
+ case e: SerDeException => throw new HiveException("Cannot validate serde: " + serdeName, e)
+ }
+ tbl.setSerializationLib(serdeName)
+ }
+
+ if (crtTbl != null && crtTbl.getFieldDelim() != null) {
+ tbl.setSerdeParam(serdeConstants.FIELD_DELIM, crtTbl.getFieldDelim())
+ tbl.setSerdeParam(serdeConstants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim())
+ }
+ if (crtTbl != null && crtTbl.getFieldEscape() != null) {
+ tbl.setSerdeParam(serdeConstants.ESCAPE_CHAR, crtTbl.getFieldEscape())
+ }
+
+ if (crtTbl != null && crtTbl.getCollItemDelim() != null) {
+ tbl.setSerdeParam(serdeConstants.COLLECTION_DELIM, crtTbl.getCollItemDelim())
+ }
+ if (crtTbl != null && crtTbl.getMapKeyDelim() != null) {
+ tbl.setSerdeParam(serdeConstants.MAPKEY_DELIM, crtTbl.getMapKeyDelim())
+ }
+ if (crtTbl != null && crtTbl.getLineDelim() != null) {
+ tbl.setSerdeParam(serdeConstants.LINE_DELIM, crtTbl.getLineDelim())
+ }
+
+ if (crtTbl != null && crtTbl.getSerdeProps() != null) {
+ val iter = crtTbl.getSerdeProps().entrySet().iterator()
+ while (iter.hasNext()) {
+ val m = iter.next()
+ tbl.setSerdeParam(m.getKey(), m.getValue())
+ }
+ }
+
+ if (crtTbl != null && crtTbl.getComment() != null) {
+ tbl.setProperty("comment", crtTbl.getComment())
+ }
+
+ if (crtTbl != null && crtTbl.getLocation() != null) {
+ HiveShim.setLocation(tbl, crtTbl)
+ }
+
+ if (crtTbl != null && crtTbl.getSkewedColNames() != null) {
+ tbl.setSkewedColNames(crtTbl.getSkewedColNames())
+ }
+ if (crtTbl != null && crtTbl.getSkewedColValues() != null) {
+ tbl.setSkewedColValues(crtTbl.getSkewedColValues())
+ }
+
+ if (crtTbl != null) {
+ tbl.setStoredAsSubDirectories(crtTbl.isStoredAsSubDirectories())
+ tbl.setInputFormatClass(crtTbl.getInputFormat())
+ tbl.setOutputFormatClass(crtTbl.getOutputFormat())
+ }
+
+ tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName())
+ tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName())
+
+ if (crtTbl != null && crtTbl.isExternal()) {
+ tbl.setProperty("EXTERNAL", "TRUE")
+ tbl.setTableType(TableType.EXTERNAL_TABLE)
+ }
+
+ // set owner
+ try {
+ tbl.setOwner(hive.hiveconf.getUser)
+ } catch {
+ case e: IOException => throw new HiveException("Unable to get current user", e)
+ }
+
+ // set create time
+ tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
+ // TODO add bucket support
+ // TODO set more info if Hive upgrade
+ // create the table
synchronized {
- try client.createTable(table) catch {
- case e: org.apache.hadoop.hive.ql.metadata.HiveException
- if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
- allowExisting => // Do nothing.
+ try client.createTable(tbl, allowExisting) catch {
+ case e: org.apache.hadoop.hive.metastore.api.AlreadyExistsException
+ if allowExisting => // Do nothing
+ case e: Throwable => throw e
}
}
}
@@ -110,11 +247,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
- case CreateTableAsSelect(db, tableName, child) =>
+ case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
- CreateTableAsSelect(Some(databaseName), tableName, child)
+ CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index ed07a28039..9d9d68affa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -448,14 +448,14 @@ private[hive] object HiveQl {
}
case Token("TOK_CREATETABLE", children)
- if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty =>
- // TODO: Parse other clauses.
+ if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val (
Some(tableNameParts) ::
_ /* likeTable */ ::
- Some(query) +:
- notImplemented) =
+ Some(query) ::
+ allowExisting +:
+ ignores) =
getClauses(
Seq(
"TOK_TABNAME",
@@ -479,14 +479,9 @@ private[hive] object HiveQl {
"TOK_TABLELOCATION",
"TOK_TABLEPROPERTIES"),
children)
- if (notImplemented.exists(token => !token.isEmpty)) {
- throw new NotImplementedError(
- s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}")
- }
-
val (db, tableName) = extractDbNameTableName(tableNameParts)
- CreateTableAsSelect(db, tableName, nodeToPlan(query))
+ CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node))
// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
case Token("TOK_CREATETABLE", _) => NativePlaceholder
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5c66322f1e..e59d4d536a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import org.apache.hadoop.hive.ql.parse.ASTNode
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
@@ -160,17 +162,14 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
-
- case logical.CreateTableAsSelect(database, tableName, child) =>
- val query = planLater(child)
+ case logical.CreateTableAsSelect(
+ Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
CreateTableAsSelect(
- database.get,
+ database,
tableName,
- query,
- InsertIntoHiveTable(_: MetastoreRelation,
- Map(),
- query,
- overwrite = true)(hiveContext)) :: Nil
+ child,
+ allowExisting,
+ extra) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 3625708d03..2fce414734 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,10 +17,12 @@
package org.apache.spark.sql.hive.execution
+import org.apache.hadoop.hive.ql.Context
+import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.MetastoreRelation
@@ -30,33 +32,46 @@ import org.apache.spark.sql.hive.MetastoreRelation
* Create table and insert the query result into it.
* @param database the database name of the new relation
* @param tableName the table name of the new relation
- * @param insertIntoRelation function of creating the `InsertIntoHiveTable`
- * by specifying the `MetaStoreRelation`, the data will be inserted into that table.
- * TODO Add more table creating properties, e.g. SerDe, StorageHandler, in-memory cache etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param allowExisting allow continue working if it's already exists, otherwise
+ * raise exception
+ * @param extra the extra information for this Operator, it should be the
+ * ASTNode object for extracting the CreateTableDesc.
+
*/
@Experimental
case class CreateTableAsSelect(
database: String,
tableName: String,
- query: SparkPlan,
- insertIntoRelation: MetastoreRelation => InsertIntoHiveTable)
- extends LeafNode with Command {
+ query: LogicalPlan,
+ allowExisting: Boolean,
+ extra: ASTNode) extends LeafNode with Command {
def output = Seq.empty
+ private[this] def sc = sqlContext.asInstanceOf[HiveContext]
+
// A lazy computing of the metastoreRelation
private[this] lazy val metastoreRelation: MetastoreRelation = {
- // Create the table
- val sc = sqlContext.asInstanceOf[HiveContext]
- sc.catalog.createTable(database, tableName, query.output, false)
+ // Get the CreateTableDesc from Hive SemanticAnalyzer
+ val sa = new SemanticAnalyzer(sc.hiveconf)
+
+ sa.analyze(extra, new Context(sc.hiveconf))
+ val desc = sa.getQB().getTableDesc
+ // Create Hive Table
+ sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc))
+
// Get the Metastore Relation
sc.catalog.lookupRelation(Some(database), tableName, None) match {
case r: MetastoreRelation => r
}
}
- override protected lazy val sideEffectResult: Seq[Row] = {
- insertIntoRelation(metastoreRelation).execute
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ // TODO ideally, we should get the output data ready first and then
+ // add the relation into catalog, just in case of failure occurs while data
+ // processing.
+ sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
Seq.empty[Row]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 6b06410520..f89c49d292 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -33,6 +33,25 @@ import org.apache.spark.sql.catalyst.util._
*/
class QueryTest extends PlanTest {
/**
+ * Runs the plan and makes sure the answer contains all of the keywords, or the
+ * none of keywords are listed in the answer
+ * @param rdd the [[SchemaRDD]] to be executed
+ * @param exists true for make sure the keywords are listed in the output, otherwise
+ * to make sure none of the keyword are not listed in the output
+ * @param keywords keyword in string array
+ */
+ def checkExistence(rdd: SchemaRDD, exists: Boolean, keywords: String*) {
+ val outputs = rdd.collect().map(_.mkString).mkString
+ for (key <- keywords) {
+ if (exists) {
+ assert(outputs.contains(key), s"Failed for $rdd ($key doens't exist in result)")
+ } else {
+ assert(!outputs.contains(key), s"Failed for $rdd ($key existed in the result)")
+ }
+ }
+ }
+
+ /**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[SchemaRDD]] to be executed
* @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 4ed58f4be1..a68fc2a803 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -18,37 +18,24 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.Row
/**
* A set of tests that validates support for Hive Explain command.
*/
class HiveExplainSuite extends QueryTest {
- private def check(sqlCmd: String, exists: Boolean, keywords: String*) {
- val outputs = sql(sqlCmd).collect().map(_.getString(0)).mkString
- for (key <- keywords) {
- if (exists) {
- assert(outputs.contains(key), s"Failed for $sqlCmd ($key doens't exist in result)")
- } else {
- assert(!outputs.contains(key), s"Failed for $sqlCmd ($key existed in the result)")
- }
- }
- }
-
test("explain extended command") {
- check(" explain select * from src where key=123 ", true,
- "== Physical Plan ==")
- check(" explain select * from src where key=123 ", false,
- "== Parsed Logical Plan ==",
- "== Analyzed Logical Plan ==",
- "== Optimized Logical Plan ==")
- check(" explain extended select * from src where key=123 ", true,
- "== Parsed Logical Plan ==",
- "== Analyzed Logical Plan ==",
- "== Optimized Logical Plan ==",
- "== Physical Plan ==",
- "Code Generation", "== RDD ==")
+ checkExistence(sql(" explain select * from src where key=123 "), true,
+ "== Physical Plan ==")
+ checkExistence(sql(" explain select * from src where key=123 "), false,
+ "== Parsed Logical Plan ==",
+ "== Analyzed Logical Plan ==",
+ "== Optimized Logical Plan ==")
+ checkExistence(sql(" explain extended select * from src where key=123 "), true,
+ "== Parsed Logical Plan ==",
+ "== Analyzed Logical Plan ==",
+ "== Optimized Logical Plan ==",
+ "== Physical Plan ==",
+ "Code Generation", "== RDD ==")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a4aea31d3f..4f96a327ee 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -32,6 +32,65 @@ case class Nested3(f3: Int)
* valid, but Hive currently cannot execute it.
*/
class SQLQuerySuite extends QueryTest {
+ test("CTAS with serde") {
+ sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect
+ sql(
+ """CREATE TABLE ctas2
+ | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"
+ | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2")
+ | STORED AS RCFile
+ | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22")
+ | AS
+ | SELECT key, value
+ | FROM src
+ | ORDER BY key, value""".stripMargin).collect
+ sql(
+ """CREATE TABLE ctas3
+ | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\012'
+ | STORED AS textfile AS
+ | SELECT key, value
+ | FROM src
+ | ORDER BY key, value""".stripMargin).collect
+
+ // the table schema may like (key: integer, value: string)
+ sql(
+ """CREATE TABLE IF NOT EXISTS ctas4 AS
+ | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
+ // expect the string => integer for field key cause the table ctas4 already existed.
+ sql(
+ """CREATE TABLE IF NOT EXISTS ctas4 AS
+ | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
+
+ checkAnswer(
+ sql("SELECT k, value FROM ctas1 ORDER BY k, value"),
+ sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
+ checkAnswer(
+ sql("SELECT key, value FROM ctas2 ORDER BY key, value"),
+ sql(
+ """
+ SELECT key, value
+ FROM src
+ ORDER BY key, value""").collect().toSeq)
+ checkAnswer(
+ sql("SELECT key, value FROM ctas3 ORDER BY key, value"),
+ sql(
+ """
+ SELECT key, value
+ FROM src
+ ORDER BY key, value""").collect().toSeq)
+ checkAnswer(
+ sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
+ sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq)
+
+ checkExistence(sql("DESC EXTENDED ctas2"), true,
+ "name:key", "type:string", "name:value", "ctas2",
+ "org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+ "org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+ "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+ "serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22","MANAGED_TABLE"
+ )
+ }
+
test("ordering not in select") {
checkAnswer(
sql("SELECT key FROM src ORDER BY value"),
diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
index 2317d2e763..8cb81db8a9 100644
--- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
+++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils}
@@ -89,6 +89,9 @@ private[hive] object HiveShim {
"udf_concat"
)
+ def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = {
+ tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri())
+ }
}
class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index b8d893d8c1..b9a742cc6e 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.`type`.{HiveDecimal}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.{Table, Hive, Partition}
-import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer}
import org.apache.hadoop.mapred.InputFormat
@@ -121,6 +121,10 @@ private[hive] object HiveShim {
def compatibilityBlackList = Seq()
+ def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = {
+ tbl.setDataLocation(new Path(crtTbl.getLocation()))
+ }
+
/*
* Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
* Fix it through wrapper.