aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-17 18:14:33 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-17 18:14:33 -0800
commite50934f11e1e3ded21a631e5ab69db3c79467137 (patch)
treecbd1d07d9cd7b8bbad43ec7da0f112782d5a781f /sql/hive
parentd5f12bfe8f0a98d6fee114bb24376668ebe2898e (diff)
downloadspark-e50934f11e1e3ded21a631e5ab69db3c79467137.tar.gz
spark-e50934f11e1e3ded21a631e5ab69db3c79467137.tar.bz2
spark-e50934f11e1e3ded21a631e5ab69db3c79467137.zip
[SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.
JIRA: https://issues.apache.org/jira/browse/SPARK-5723 Author: Yin Huai <yhuai@databricks.com> This patch had conflicts when merged, resolved by Committer: Michael Armbrust <michael@databricks.com> Closes #4639 from yhuai/defaultCTASFileFormat and squashes the following commits: a568137 [Yin Huai] Merge remote-tracking branch 'upstream/master' into defaultCTASFileFormat ad2b07d [Yin Huai] Update tests and error messages. 8af5b2a [Yin Huai] Update conf key and unit test. 5a67903 [Yin Huai] Use data source write path for Hive's CTAS statements when no storage format/handler is specified.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala75
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala70
5 files changed, 158 insertions, 25 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 6c55bc6be1..d3365b1e8f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[sql] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
+ /**
+ * When true, a table created by a Hive CTAS statement (no USING clause) will be
+ * converted to a data source table, using the data source set by spark.sql.sources.default.
+ * The table in CTAS statement will be converted when it meets any of the following conditions:
+ * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or
+ * a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml
+ * is either TextFile or SequenceFile.
+ * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe
+ * is specified (no ROW FORMAT SERDE clause).
+ * - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
+ * and no SerDe is specified (no ROW FORMAT SERDE clause).
+ */
+ protected[sql] def convertCTAS: Boolean =
+ getConf("spark.sql.hive.convertCTAS", "false").toBoolean
+
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index cfd6f27371..f7ad2efc95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark.Logging
-import org.apache.spark.sql.{AnalysisException, SQLContext}
+import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
-import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -502,24 +502,69 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
Some(sa.getQB().getTableDesc)
}
- execution.CreateTableAsSelect(
- databaseName,
- tableName,
- child,
- allowExisting,
- desc)
+ // Check if the query specifies file format or storage handler.
+ val hasStorageSpec = desc match {
+ case Some(crtTbl) =>
+ crtTbl != null && (crtTbl.getSerName != null || crtTbl.getStorageHandler != null)
+ case None => false
+ }
+
+ if (hive.convertCTAS && !hasStorageSpec) {
+ // Do the conversion when spark.sql.hive.convertCTAS is true and the query
+ // does not specify any storage format (file format and storage handler).
+ if (dbName.isDefined) {
+ throw new AnalysisException(
+ "Cannot specify database name in a CTAS statement " +
+ "when spark.sql.hive.convertCTAS is set to true.")
+ }
+
+ val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
+ CreateTableUsingAsSelect(
+ tblName,
+ hive.conf.defaultDataSourceName,
+ temporary = false,
+ mode,
+ options = Map.empty[String, String],
+ child
+ )
+ } else {
+ execution.CreateTableAsSelect(
+ databaseName,
+ tableName,
+ child,
+ allowExisting,
+ desc)
+ }
case p: LogicalPlan if p.resolved => p
case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
- val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
- execution.CreateTableAsSelect(
- databaseName,
- tableName,
- child,
- allowExisting,
- None)
+ if (hive.convertCTAS) {
+ if (dbName.isDefined) {
+ throw new AnalysisException(
+ "Cannot specify database name in a CTAS statement " +
+ "when spark.sql.hive.convertCTAS is set to true.")
+ }
+
+ val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
+ CreateTableUsingAsSelect(
+ tblName,
+ hive.conf.defaultDataSourceName,
+ temporary = false,
+ mode,
+ options = Map.empty[String, String],
+ child
+ )
+ } else {
+ val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
+ execution.CreateTableAsSelect(
+ databaseName,
+ tableName,
+ child,
+ allowExisting,
+ None)
+ }
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6afd8eea05..c88d0e6b79 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.sources._
@@ -121,7 +122,7 @@ case class CreateMetastoreDataSource(
if (allowExisting) {
return Seq.empty[Row]
} else {
- sys.error(s"Table $tableName already exists.")
+ throw new AnalysisException(s"Table $tableName already exists.")
}
}
@@ -172,9 +173,11 @@ case class CreateMetastoreDataSourceAsSelect(
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
- sys.error(s"Table $tableName already exists. " +
- s"If you want to append into it, please set mode to SaveMode.Append. " +
- s"Or, if you want to overwrite it, please set mode to SaveMode.Overwrite.")
+ throw new AnalysisException(s"Table $tableName already exists. " +
+ s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
+ s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
+ s"the existing data. " +
+ s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
case SaveMode.Ignore =>
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty[Row]
@@ -199,7 +202,7 @@ case class CreateMetastoreDataSourceAsSelect(
s"== Actual Schema ==" +:
createdRelation.schema.treeString.split("\\\n")).mkString("\n")}
""".stripMargin
- sys.error(errorMessage)
+ throw new AnalysisException(errorMessage)
} else if (i != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
@@ -216,10 +219,10 @@ case class CreateMetastoreDataSourceAsSelect(
s"== Actual Relation ==" ::
createdRelation.toString :: Nil).mkString("\n")}
""".stripMargin
- sys.error(errorMessage)
+ throw new AnalysisException(errorMessage)
}
case o =>
- sys.error(s"Saving data in ${o.toString} is not supported.")
+ throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
case SaveMode.Overwrite =>
hiveContext.sql(s"DROP TABLE IF EXISTS $tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c30090fabb..e5156ae821 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -306,8 +306,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
|SELECT * FROM jsonTable
""".stripMargin)
- // Create the table again should trigger a AlreadyExistsException.
- val message = intercept[RuntimeException] {
+ // Create the table again should trigger a AnalysisException.
+ val message = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE ctasJsonTable
@@ -516,7 +516,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT * FROM createdJsonTable"),
df.collect())
- var message = intercept[RuntimeException] {
+ var message = intercept[AnalysisException] {
createExternalTable("createdJsonTable", filePath.toString)
}.getMessage
assert(message.contains("Table createdJsonTable already exists."),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ae03bc5e99..f2bc73bf3b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,10 +17,13 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
@@ -42,6 +45,73 @@ class SQLQuerySuite extends QueryTest {
)
}
+ test("CTAS without serde") {
+ def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
+ val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
+ relation match {
+ case LogicalRelation(r: ParquetRelation2) =>
+ if (!isDataSourceParquet) {
+ fail(
+ s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
+ s"${ParquetRelation2.getClass.getCanonicalName}.")
+ }
+
+ case r: MetastoreRelation =>
+ if (isDataSourceParquet) {
+ fail(
+ s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
+ s"${classOf[MetastoreRelation].getCanonicalName}.")
+ }
+ }
+ }
+
+ val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
+
+ setConf("spark.sql.hive.convertCTAS", "true")
+
+ sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
+ sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
+ var message = intercept[AnalysisException] {
+ sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
+ }.getMessage
+ assert(message.contains("Table ctas1 already exists"))
+ checkRelation("ctas1", true)
+ sql("DROP TABLE ctas1")
+
+ // Specifying database name for query can be converted to data source write path
+ // is not allowed right now.
+ message = intercept[AnalysisException] {
+ sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
+ }.getMessage
+ assert(
+ message.contains("Cannot specify database name in a CTAS statement"),
+ "When spark.sql.hive.convertCTAS is true, we should not allow " +
+ "database name specified.")
+
+ sql("CREATE TABLE ctas1 stored as textfile AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", true)
+ sql("DROP TABLE ctas1")
+
+ sql(
+ "CREATE TABLE ctas1 stored as sequencefile AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", true)
+ sql("DROP TABLE ctas1")
+
+ sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", false)
+ sql("DROP TABLE ctas1")
+
+ sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", false)
+ sql("DROP TABLE ctas1")
+
+ sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", false)
+ sql("DROP TABLE ctas1")
+
+ setConf("spark.sql.hive.convertCTAS", originalConf)
+ }
+
test("CTAS with serde") {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
sql(