From e50934f11e1e3ded21a631e5ab69db3c79467137 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Feb 2015 18:14:33 -0800 Subject: [SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements. JIRA: https://issues.apache.org/jira/browse/SPARK-5723 Author: Yin Huai This patch had conflicts when merged, resolved by Committer: Michael Armbrust Closes #4639 from yhuai/defaultCTASFileFormat and squashes the following commits: a568137 [Yin Huai] Merge remote-tracking branch 'upstream/master' into defaultCTASFileFormat ad2b07d [Yin Huai] Update tests and error messages. 8af5b2a [Yin Huai] Update conf key and unit test. 5a67903 [Yin Huai] Use data source write path for Hive's CTAS statements when no storage format/handler is specified. --- .../org/apache/spark/sql/hive/HiveContext.scala | 15 +++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 75 +++++++++++++++++----- .../apache/spark/sql/hive/execution/commands.scala | 17 +++-- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 6 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 70 ++++++++++++++++++++ 5 files changed, 158 insertions(+), 25 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 6c55bc6be1..d3365b1e8f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertMetastoreParquet: Boolean = getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true" + /** + * When true, a table created by a Hive CTAS statement (no USING clause) will be + * converted to a data source table, using the data source set by spark.sql.sources.default. + * The table in CTAS statement will be converted when it meets any of the following conditions: + * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or + * a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml + * is either TextFile or SequenceFile. + * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe + * is specified (no ROW FORMAT SERDE clause). + * - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format + * and no SerDe is specified (no ROW FORMAT SERDE clause). + */ + protected[sql] def convertCTAS: Boolean = + getConf("spark.sql.hive.convertCTAS", "false").toBoolean + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index cfd6f27371..f7ad2efc95 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.util.ReflectionUtils import org.apache.spark.Logging -import org.apache.spark.sql.{AnalysisException, SQLContext} +import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec} -import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, ResolvedDataSource} +import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -502,24 +502,69 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Some(sa.getQB().getTableDesc) } - execution.CreateTableAsSelect( - databaseName, - tableName, - child, - allowExisting, - desc) + // Check if the query specifies file format or storage handler. + val hasStorageSpec = desc match { + case Some(crtTbl) => + crtTbl != null && (crtTbl.getSerName != null || crtTbl.getStorageHandler != null) + case None => false + } + + if (hive.convertCTAS && !hasStorageSpec) { + // Do the conversion when spark.sql.hive.convertCTAS is true and the query + // does not specify any storage format (file format and storage handler). + if (dbName.isDefined) { + throw new AnalysisException( + "Cannot specify database name in a CTAS statement " + + "when spark.sql.hive.convertCTAS is set to true.") + } + + val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTableUsingAsSelect( + tblName, + hive.conf.defaultDataSourceName, + temporary = false, + mode, + options = Map.empty[String, String], + child + ) + } else { + execution.CreateTableAsSelect( + databaseName, + tableName, + child, + allowExisting, + desc) + } case p: LogicalPlan if p.resolved => p case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) - val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) - execution.CreateTableAsSelect( - databaseName, - tableName, - child, - allowExisting, - None) + if (hive.convertCTAS) { + if (dbName.isDefined) { + throw new AnalysisException( + "Cannot specify database name in a CTAS statement " + + "when spark.sql.hive.convertCTAS is set to true.") + } + + val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTableUsingAsSelect( + tblName, + hive.conf.defaultDataSourceName, + temporary = false, + mode, + options = Map.empty[String, String], + child + ) + } else { + val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) + execution.CreateTableAsSelect( + databaseName, + tableName, + child, + allowExisting, + None) + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 6afd8eea05..c88d0e6b79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.sources._ @@ -121,7 +122,7 @@ case class CreateMetastoreDataSource( if (allowExisting) { return Seq.empty[Row] } else { - sys.error(s"Table $tableName already exists.") + throw new AnalysisException(s"Table $tableName already exists.") } } @@ -172,9 +173,11 @@ case class CreateMetastoreDataSourceAsSelect( // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => - sys.error(s"Table $tableName already exists. " + - s"If you want to append into it, please set mode to SaveMode.Append. " + - s"Or, if you want to overwrite it, please set mode to SaveMode.Overwrite.") + throw new AnalysisException(s"Table $tableName already exists. " + + s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " + + s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + + s"the existing data. " + + s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") case SaveMode.Ignore => // Since the table already exists and the save mode is Ignore, we will just return. return Seq.empty[Row] @@ -199,7 +202,7 @@ case class CreateMetastoreDataSourceAsSelect( s"== Actual Schema ==" +: createdRelation.schema.treeString.split("\\\n")).mkString("\n")} """.stripMargin - sys.error(errorMessage) + throw new AnalysisException(errorMessage) } else if (i != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + @@ -216,10 +219,10 @@ case class CreateMetastoreDataSourceAsSelect( s"== Actual Relation ==" :: createdRelation.toString :: Nil).mkString("\n")} """.stripMargin - sys.error(errorMessage) + throw new AnalysisException(errorMessage) } case o => - sys.error(s"Saving data in ${o.toString} is not supported.") + throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } case SaveMode.Overwrite => hiveContext.sql(s"DROP TABLE IF EXISTS $tableName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c30090fabb..e5156ae821 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -306,8 +306,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |SELECT * FROM jsonTable """.stripMargin) - // Create the table again should trigger a AlreadyExistsException. - val message = intercept[RuntimeException] { + // Create the table again should trigger a AnalysisException. + val message = intercept[AnalysisException] { sql( s""" |CREATE TABLE ctasJsonTable @@ -516,7 +516,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT * FROM createdJsonTable"), df.collect()) - var message = intercept[RuntimeException] { + var message = intercept[AnalysisException] { createExternalTable("createdJsonTable", filePath.toString) }.getMessage assert(message.contains("Table createdJsonTable already exists."), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ae03bc5e99..f2bc73bf3b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} @@ -42,6 +45,73 @@ class SQLQuerySuite extends QueryTest { ) } + test("CTAS without serde") { + def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { + val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) + relation match { + case LogicalRelation(r: ParquetRelation2) => + if (!isDataSourceParquet) { + fail( + s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + + s"${ParquetRelation2.getClass.getCanonicalName}.") + } + + case r: MetastoreRelation => + if (isDataSourceParquet) { + fail( + s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " + + s"${classOf[MetastoreRelation].getCanonicalName}.") + } + } + } + + val originalConf = getConf("spark.sql.hive.convertCTAS", "false") + + setConf("spark.sql.hive.convertCTAS", "true") + + sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + var message = intercept[AnalysisException] { + sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + }.getMessage + assert(message.contains("Table ctas1 already exists")) + checkRelation("ctas1", true) + sql("DROP TABLE ctas1") + + // Specifying database name for query can be converted to data source write path + // is not allowed right now. + message = intercept[AnalysisException] { + sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + }.getMessage + assert( + message.contains("Cannot specify database name in a CTAS statement"), + "When spark.sql.hive.convertCTAS is true, we should not allow " + + "database name specified.") + + sql("CREATE TABLE ctas1 stored as textfile AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", true) + sql("DROP TABLE ctas1") + + sql( + "CREATE TABLE ctas1 stored as sequencefile AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", true) + sql("DROP TABLE ctas1") + + sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false) + sql("DROP TABLE ctas1") + + sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false) + sql("DROP TABLE ctas1") + + sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false) + sql("DROP TABLE ctas1") + + setConf("spark.sql.hive.convertCTAS", originalConf) + } + test("CTAS with serde") { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect() sql( -- cgit v1.2.3