From 4b55482abf899c27da3d55401ad26b4e9247b327 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 28 Oct 2014 14:36:06 -0700 Subject: [SPARK-3343] [SQL] Add serde support for CTAS Currently, `CTAS` (Create Table As Select) doesn't support specifying the `SerDe` in HQL. This PR will pass down the `ASTNode` into the physical operator `execution.CreateTableAsSelect`, which will extract the `CreateTableDesc` object via Hive `SemanticAnalyzer`. In the meantime, I also update the `HiveMetastoreCatalog.createTable` to optionally support the `CreateTableDesc` for table creation. Author: Cheng Hao Closes #2570 from chenghao-intel/ctas_serde and squashes the following commits: e011ef5 [Cheng Hao] shim for both 0.12 & 0.13.1 cfb3662 [Cheng Hao] revert to hive 0.12 c8a547d [Cheng Hao] Support SerDe properties within CTAS --- .../catalyst/plans/logical/basicOperators.scala | 8 +- .../scala/org/apache/spark/sql/SchemaRDDLike.scala | 4 +- .../scala/org/apache/spark/sql/QueryTest.scala | 19 ++ .../hive/execution/HiveCompatibilitySuite.scala | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 197 +++++++++++++++++---- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 15 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 17 +- .../sql/hive/execution/CreateTableAsSelect.scala | 39 ++-- .../scala/org/apache/spark/sql/QueryTest.scala | 19 ++ .../sql/hive/execution/HiveExplainSuite.scala | 37 ++-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 59 ++++++ .../scala/org/apache/spark/sql/hive/Shim12.scala | 5 +- .../scala/org/apache/spark/sql/hive/Shim13.scala | 6 +- 13 files changed, 337 insertions(+), 94 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 14b03c7445..00bdf108a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -114,11 +114,13 @@ case class InsertIntoTable( } } -case class CreateTableAsSelect( +case class CreateTableAsSelect[T]( databaseName: Option[String], tableName: String, - child: LogicalPlan) extends UnaryNode { - override def output = child.output + child: LogicalPlan, + allowExisting: Boolean, + desc: Option[T] = None) extends UnaryNode { + override def output = Seq.empty[Attribute] override lazy val resolved = (databaseName != None && childrenResolved) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index 25ba7d88ba..15516afb95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike { @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match { // For various commands (like DDL) and queries with side effects, we force query optimization to // happen right away to let these side effects take place eagerly. - case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile => + case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile => LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) case _ => baseLogicalPlan @@ -123,7 +123,7 @@ private[sql] trait SchemaRDDLike { */ @Experimental def saveAsTable(tableName: String): Unit = - sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd + sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd /** Returns the schema as a string in the tree format. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 1fd8d27b34..042f61f5a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -21,6 +21,25 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ class QueryTest extends PlanTest { + /** + * Runs the plan and makes sure the answer contains all of the keywords, or the + * none of keywords are listed in the answer + * @param rdd the [[SchemaRDD]] to be executed + * @param exists true for make sure the keywords are listed in the output, otherwise + * to make sure none of the keyword are not listed in the output + * @param keywords keyword in string array + */ + def checkExistence(rdd: SchemaRDD, exists: Boolean, keywords: String*) { + val outputs = rdd.collect().map(_.mkString).mkString + for (key <- keywords) { + if (exists) { + assert(outputs.contains(key), s"Failed for $rdd ($key doens't exist in result)") + } else { + assert(!outputs.contains(key), s"Failed for $rdd ($key existed in the result)") + } + } + } + /** * Runs the plan and makes sure the answer matches the expected result. * @param rdd the [[SchemaRDD]] to be executed diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 4fc26d6f55..26d9ca05c8 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -229,7 +229,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Needs constant object inspectors "udf_round", - "udf7" + "udf7", + + // Sort with Limit clause causes failure. + "ctas", + "ctas_hadoop20" ) ++ HiveShim.compatibilityBlackList /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 39d87a9d14..2dd2c882a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,17 +17,27 @@ package org.apache.spark.sql.hive +import java.io.IOException +import java.util.{List => JList} + import scala.util.parsing.combinator.RegexParsers -import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition => TPartition, Table => TTable} -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} -import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.serde2.Deserializer +import org.apache.hadoop.util.ReflectionUtils +import org.apache.hadoop.fs.Path + +import org.apache.hadoop.hive.metastore.TableType +import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} +import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} +import org.apache.hadoop.hive.ql.plan.{TableDesc, CreateTableDesc} +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.analysis.Catalog +import org.apache.spark.sql.catalyst.analysis.{Catalog, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ @@ -66,37 +76,164 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with table.getTTable, partitions.map(part => part.getTPartition))(hive) } + /** + * Create table with specified database, table name, table description and schema + * @param databaseName Database Name + * @param tableName Table Name + * @param schema Schema of the new table, if not specified, will use the schema + * specified in crtTbl + * @param allowExisting if true, ignore AlreadyExistsException + * @param desc CreateTableDesc object which contains the SerDe info. Currently + * we support most of the features except the bucket. + */ def createTable( databaseName: String, tableName: String, schema: Seq[Attribute], - allowExisting: Boolean = false): Unit = { + allowExisting: Boolean = false, + desc: Option[CreateTableDesc] = None) { + val hconf = hive.hiveconf + val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - val table = new Table(dbName, tblName) - val hiveSchema = + val tbl = new Table(dbName, tblName) + + val crtTbl: CreateTableDesc = desc.getOrElse(null) + + // We should respect the passed in schema, unless it's not set + val hiveSchema: JList[FieldSchema] = if (schema == null || schema.isEmpty) { + crtTbl.getCols + } else { schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), "")) - table.setFields(hiveSchema) - - val sd = new StorageDescriptor() - table.getTTable.setSd(sd) - sd.setCols(hiveSchema) - - // TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs. - sd.setCompressed(false) - sd.setParameters(Map[String, String]()) - sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat") - sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat") - val serDeInfo = new SerDeInfo() - serDeInfo.setName(tblName) - serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") - serDeInfo.setParameters(Map[String, String]()) - sd.setSerdeInfo(serDeInfo) + } + tbl.setFields(hiveSchema) + + // Most of code are similar with the DDLTask.createTable() of Hive, + if (crtTbl != null && crtTbl.getTblProps() != null) { + tbl.getTTable().getParameters().putAll(crtTbl.getTblProps()) + } + + if (crtTbl != null && crtTbl.getPartCols() != null) { + tbl.setPartCols(crtTbl.getPartCols()) + } + + if (crtTbl != null && crtTbl.getStorageHandler() != null) { + tbl.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE, + crtTbl.getStorageHandler()) + } + + /* + * We use LazySimpleSerDe by default. + * + * If the user didn't specify a SerDe, and any of the columns are not simple + * types, we will have to use DynamicSerDe instead. + */ + if (crtTbl == null || crtTbl.getSerName() == null) { + val storageHandler = tbl.getStorageHandler() + if (storageHandler == null) { + logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName") + tbl.setSerializationLib(classOf[LazySimpleSerDe].getName()) + + import org.apache.hadoop.mapred.TextInputFormat + import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + import org.apache.hadoop.io.Text + + tbl.setInputFormatClass(classOf[TextInputFormat]) + tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]]) + tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + } else { + val serDeClassName = storageHandler.getSerDeClass().getName() + logInfo(s"Use StorageHandler-supplied $serDeClassName for table $dbName.$tblName") + tbl.setSerializationLib(serDeClassName) + } + } else { + // let's validate that the serde exists + val serdeName = crtTbl.getSerName() + try { + val d = ReflectionUtils.newInstance(hconf.getClassByName(serdeName), hconf) + if (d != null) { + logDebug("Found class for $serdeName") + } + } catch { + case e: SerDeException => throw new HiveException("Cannot validate serde: " + serdeName, e) + } + tbl.setSerializationLib(serdeName) + } + + if (crtTbl != null && crtTbl.getFieldDelim() != null) { + tbl.setSerdeParam(serdeConstants.FIELD_DELIM, crtTbl.getFieldDelim()) + tbl.setSerdeParam(serdeConstants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim()) + } + if (crtTbl != null && crtTbl.getFieldEscape() != null) { + tbl.setSerdeParam(serdeConstants.ESCAPE_CHAR, crtTbl.getFieldEscape()) + } + + if (crtTbl != null && crtTbl.getCollItemDelim() != null) { + tbl.setSerdeParam(serdeConstants.COLLECTION_DELIM, crtTbl.getCollItemDelim()) + } + if (crtTbl != null && crtTbl.getMapKeyDelim() != null) { + tbl.setSerdeParam(serdeConstants.MAPKEY_DELIM, crtTbl.getMapKeyDelim()) + } + if (crtTbl != null && crtTbl.getLineDelim() != null) { + tbl.setSerdeParam(serdeConstants.LINE_DELIM, crtTbl.getLineDelim()) + } + + if (crtTbl != null && crtTbl.getSerdeProps() != null) { + val iter = crtTbl.getSerdeProps().entrySet().iterator() + while (iter.hasNext()) { + val m = iter.next() + tbl.setSerdeParam(m.getKey(), m.getValue()) + } + } + + if (crtTbl != null && crtTbl.getComment() != null) { + tbl.setProperty("comment", crtTbl.getComment()) + } + + if (crtTbl != null && crtTbl.getLocation() != null) { + HiveShim.setLocation(tbl, crtTbl) + } + + if (crtTbl != null && crtTbl.getSkewedColNames() != null) { + tbl.setSkewedColNames(crtTbl.getSkewedColNames()) + } + if (crtTbl != null && crtTbl.getSkewedColValues() != null) { + tbl.setSkewedColValues(crtTbl.getSkewedColValues()) + } + + if (crtTbl != null) { + tbl.setStoredAsSubDirectories(crtTbl.isStoredAsSubDirectories()) + tbl.setInputFormatClass(crtTbl.getInputFormat()) + tbl.setOutputFormatClass(crtTbl.getOutputFormat()) + } + + tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName()) + tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName()) + + if (crtTbl != null && crtTbl.isExternal()) { + tbl.setProperty("EXTERNAL", "TRUE") + tbl.setTableType(TableType.EXTERNAL_TABLE) + } + + // set owner + try { + tbl.setOwner(hive.hiveconf.getUser) + } catch { + case e: IOException => throw new HiveException("Unable to get current user", e) + } + + // set create time + tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) + + // TODO add bucket support + // TODO set more info if Hive upgrade + // create the table synchronized { - try client.createTable(table) catch { - case e: org.apache.hadoop.hive.ql.metadata.HiveException - if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] && - allowExisting => // Do nothing. + try client.createTable(tbl, allowExisting) catch { + case e: org.apache.hadoop.hive.metastore.api.AlreadyExistsException + if allowExisting => // Do nothing + case e: Throwable => throw e } } } @@ -110,11 +247,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p - case CreateTableAsSelect(db, tableName, child) => + case CreateTableAsSelect(db, tableName, child, allowExisting, extra) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) - CreateTableAsSelect(Some(databaseName), tableName, child) + CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ed07a28039..9d9d68affa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -448,14 +448,14 @@ private[hive] object HiveQl { } case Token("TOK_CREATETABLE", children) - if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty => - // TODO: Parse other clauses. + if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty => // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL val ( Some(tableNameParts) :: _ /* likeTable */ :: - Some(query) +: - notImplemented) = + Some(query) :: + allowExisting +: + ignores) = getClauses( Seq( "TOK_TABNAME", @@ -479,14 +479,9 @@ private[hive] object HiveQl { "TOK_TABLELOCATION", "TOK_TABLEPROPERTIES"), children) - if (notImplemented.exists(token => !token.isEmpty)) { - throw new NotImplementedError( - s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}") - } - val (db, tableName) = extractDbNameTableName(tableNameParts) - CreateTableAsSelect(db, tableName, nodeToPlan(query)) + CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node)) // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command. case Token("TOK_CREATETABLE", _) => NativePlaceholder diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5c66322f1e..e59d4d536a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.hive.ql.parse.ASTNode + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ @@ -160,17 +162,14 @@ private[hive] trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil - - case logical.CreateTableAsSelect(database, tableName, child) => - val query = planLater(child) + case logical.CreateTableAsSelect( + Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) => CreateTableAsSelect( - database.get, + database, tableName, - query, - InsertIntoHiveTable(_: MetastoreRelation, - Map(), - query, - overwrite = true)(hiveContext)) :: Nil + child, + allowExisting, + extra) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 3625708d03..2fce414734 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.hive.execution +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode} import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.MetastoreRelation @@ -30,33 +32,46 @@ import org.apache.spark.sql.hive.MetastoreRelation * Create table and insert the query result into it. * @param database the database name of the new relation * @param tableName the table name of the new relation - * @param insertIntoRelation function of creating the `InsertIntoHiveTable` - * by specifying the `MetaStoreRelation`, the data will be inserted into that table. - * TODO Add more table creating properties, e.g. SerDe, StorageHandler, in-memory cache etc. + * @param query the query whose result will be insert into the new relation + * @param allowExisting allow continue working if it's already exists, otherwise + * raise exception + * @param extra the extra information for this Operator, it should be the + * ASTNode object for extracting the CreateTableDesc. + */ @Experimental case class CreateTableAsSelect( database: String, tableName: String, - query: SparkPlan, - insertIntoRelation: MetastoreRelation => InsertIntoHiveTable) - extends LeafNode with Command { + query: LogicalPlan, + allowExisting: Boolean, + extra: ASTNode) extends LeafNode with Command { def output = Seq.empty + private[this] def sc = sqlContext.asInstanceOf[HiveContext] + // A lazy computing of the metastoreRelation private[this] lazy val metastoreRelation: MetastoreRelation = { - // Create the table - val sc = sqlContext.asInstanceOf[HiveContext] - sc.catalog.createTable(database, tableName, query.output, false) + // Get the CreateTableDesc from Hive SemanticAnalyzer + val sa = new SemanticAnalyzer(sc.hiveconf) + + sa.analyze(extra, new Context(sc.hiveconf)) + val desc = sa.getQB().getTableDesc + // Create Hive Table + sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc)) + // Get the Metastore Relation sc.catalog.lookupRelation(Some(database), tableName, None) match { case r: MetastoreRelation => r } } - override protected lazy val sideEffectResult: Seq[Row] = { - insertIntoRelation(metastoreRelation).execute + override protected[sql] lazy val sideEffectResult: Seq[Row] = { + // TODO ideally, we should get the output data ready first and then + // add the relation into catalog, just in case of failure occurs while data + // processing. + sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala index 6b06410520..f89c49d292 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -32,6 +32,25 @@ import org.apache.spark.sql.catalyst.util._ * So, we duplicate this code here. */ class QueryTest extends PlanTest { + /** + * Runs the plan and makes sure the answer contains all of the keywords, or the + * none of keywords are listed in the answer + * @param rdd the [[SchemaRDD]] to be executed + * @param exists true for make sure the keywords are listed in the output, otherwise + * to make sure none of the keyword are not listed in the output + * @param keywords keyword in string array + */ + def checkExistence(rdd: SchemaRDD, exists: Boolean, keywords: String*) { + val outputs = rdd.collect().map(_.mkString).mkString + for (key <- keywords) { + if (exists) { + assert(outputs.contains(key), s"Failed for $rdd ($key doens't exist in result)") + } else { + assert(!outputs.contains(key), s"Failed for $rdd ($key existed in the result)") + } + } + } + /** * Runs the plan and makes sure the answer matches the expected result. * @param rdd the [[SchemaRDD]] to be executed diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index 4ed58f4be1..a68fc2a803 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -18,37 +18,24 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.Row /** * A set of tests that validates support for Hive Explain command. */ class HiveExplainSuite extends QueryTest { - private def check(sqlCmd: String, exists: Boolean, keywords: String*) { - val outputs = sql(sqlCmd).collect().map(_.getString(0)).mkString - for (key <- keywords) { - if (exists) { - assert(outputs.contains(key), s"Failed for $sqlCmd ($key doens't exist in result)") - } else { - assert(!outputs.contains(key), s"Failed for $sqlCmd ($key existed in the result)") - } - } - } - test("explain extended command") { - check(" explain select * from src where key=123 ", true, - "== Physical Plan ==") - check(" explain select * from src where key=123 ", false, - "== Parsed Logical Plan ==", - "== Analyzed Logical Plan ==", - "== Optimized Logical Plan ==") - check(" explain extended select * from src where key=123 ", true, - "== Parsed Logical Plan ==", - "== Analyzed Logical Plan ==", - "== Optimized Logical Plan ==", - "== Physical Plan ==", - "Code Generation", "== RDD ==") + checkExistence(sql(" explain select * from src where key=123 "), true, + "== Physical Plan ==") + checkExistence(sql(" explain select * from src where key=123 "), false, + "== Parsed Logical Plan ==", + "== Analyzed Logical Plan ==", + "== Optimized Logical Plan ==") + checkExistence(sql(" explain extended select * from src where key=123 "), true, + "== Parsed Logical Plan ==", + "== Analyzed Logical Plan ==", + "== Optimized Logical Plan ==", + "== Physical Plan ==", + "Code Generation", "== RDD ==") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index a4aea31d3f..4f96a327ee 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -32,6 +32,65 @@ case class Nested3(f3: Int) * valid, but Hive currently cannot execute it. */ class SQLQuerySuite extends QueryTest { + test("CTAS with serde") { + sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect + sql( + """CREATE TABLE ctas2 + | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe" + | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2") + | STORED AS RCFile + | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22") + | AS + | SELECT key, value + | FROM src + | ORDER BY key, value""".stripMargin).collect + sql( + """CREATE TABLE ctas3 + | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\012' + | STORED AS textfile AS + | SELECT key, value + | FROM src + | ORDER BY key, value""".stripMargin).collect + + // the table schema may like (key: integer, value: string) + sql( + """CREATE TABLE IF NOT EXISTS ctas4 AS + | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect + // expect the string => integer for field key cause the table ctas4 already existed. + sql( + """CREATE TABLE IF NOT EXISTS ctas4 AS + | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect + + checkAnswer( + sql("SELECT k, value FROM ctas1 ORDER BY k, value"), + sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq) + checkAnswer( + sql("SELECT key, value FROM ctas2 ORDER BY key, value"), + sql( + """ + SELECT key, value + FROM src + ORDER BY key, value""").collect().toSeq) + checkAnswer( + sql("SELECT key, value FROM ctas3 ORDER BY key, value"), + sql( + """ + SELECT key, value + FROM src + ORDER BY key, value""").collect().toSeq) + checkAnswer( + sql("SELECT key, value FROM ctas4 ORDER BY key, value"), + sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq) + + checkExistence(sql("DESC EXTENDED ctas2"), true, + "name:key", "type:string", "name:value", "ctas2", + "org.apache.hadoop.hive.ql.io.RCFileInputFormat", + "org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + "serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22","MANAGED_TABLE" + ) + } + test("ordering not in select") { checkAnswer( sql("SELECT key FROM src ORDER BY value"), diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala index 2317d2e763..8cb81db8a9 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} -import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc} import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.stats.StatsSetupConst import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils} @@ -89,6 +89,9 @@ private[hive] object HiveShim { "udf_concat" ) + def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = { + tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri()) + } } class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean) diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala index b8d893d8c1..b9a742cc6e 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.`type`.{HiveDecimal} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.{Table, Hive, Partition} -import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc} import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer} import org.apache.hadoop.mapred.InputFormat @@ -121,6 +121,10 @@ private[hive] object HiveShim { def compatibilityBlackList = Seq() + def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = { + tbl.setDataLocation(new Path(crtTbl.getLocation())) + } + /* * Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not. * Fix it through wrapper. -- cgit v1.2.3