aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-06-01 17:55:37 -0700
committerAndrew Or <andrew@databricks.com>2016-06-01 17:55:37 -0700
commit6dddb70c387ed1f002d2602b2b1f919ef021de91 (patch)
treef05dcdd5356d04e4a7e49038bc12182f2f4d3969
parentc8fb776d4a0134c47f90272c4bd5e4bba902aae5 (diff)
downloadspark-6dddb70c387ed1f002d2602b2b1f919ef021de91.tar.gz
spark-6dddb70c387ed1f002d2602b2b1f919ef021de91.tar.bz2
spark-6dddb70c387ed1f002d2602b2b1f919ef021de91.zip
[SPARK-15646][SQL] When spark.sql.hive.convertCTAS is true, the conversion rule needs to respect TEXTFILE/SEQUENCEFILE format and the user-defined location
## What changes were proposed in this pull request? When `spark.sql.hive.convertCTAS` is true, for a CTAS statement, we will create a data source table using the default source (i.e. parquet) if the CTAS does not specify any Hive storage format. However, there are two issues with this conversion logic. 1. First, we determine if a CTAS statement defines storage format by checking the serde. However, TEXTFILE/SEQUENCEFILE does not have a default serde. When we do the check, we have not set the default serde. So, a query like `CREATE TABLE abc STORED AS TEXTFILE AS SELECT ...` actually creates a data source parquet table. 2. In the conversion logic, we are ignoring the user-specified location. This PR fixes the above two issues. Also, this PR makes the parser throws an exception when a CTAS statement has a PARTITIONED BY clause. This change is made because Hive's syntax does not allow it and our current implementation actually does not work for this case (the insert operation always throws an exception because the insertion does not pick up the partitioning info). ## How was this patch tested? I am adding new tests in SQLQuerySuite and HiveDDLCommandSuite. Author: Yin Huai <yhuai@databricks.com> Closes #13386 from yhuai/SPARK-14507.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala57
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala (renamed from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala)9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala25
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala135
10 files changed, 176 insertions, 127 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 6c19bf02dc..01409c6a77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -839,7 +839,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
/**
* Create a table, returning either a [[CreateTableCommand]] or a
- * [[CreateTableAsSelectLogicalPlan]].
+ * [[CreateHiveTableAsSelectLogicalPlan]].
*
* This is not used to create datasource tables, which is handled through
* "CREATE TABLE ... USING ...".
@@ -936,7 +936,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
comment = comment)
selectQuery match {
- case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+ case Some(q) =>
+ // Hive does not allow to use a CTAS statement to create a partitioned table.
+ if (tableDesc.partitionColumnNames.nonEmpty) {
+ val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
+ "create a partitioned table using Hive's file formats. " +
+ "Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
+ "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
+ "CTAS statement."
+ throw operationNotAllowed(errorMessage, ctx)
+ }
+
+ val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
+ if (conf.convertCTAS && !hasStorageProperties) {
+ val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+ // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
+ // are empty Maps.
+ val optionsWithPath = if (location.isDefined) {
+ Map("path" -> location.get)
+ } else {
+ Map.empty[String, String]
+ }
+ CreateTableUsingAsSelect(
+ tableIdent = tableDesc.identifier,
+ provider = conf.defaultDataSourceName,
+ temporary = false,
+ partitionColumns = tableDesc.partitionColumnNames.toArray,
+ bucketSpec = None,
+ mode = mode,
+ options = optionsWithPath,
+ q
+ )
+ } else {
+ CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+ }
case None => CreateTableCommand(tableDesc, ifNotExists)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 1b89c6b9ce..90db785332 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-case class CreateTableAsSelectLogicalPlan(
+case class CreateHiveTableAsSelectLogicalPlan(
tableDesc: CatalogTable,
child: LogicalPlan,
allowExisting: Boolean) extends UnaryNode with Command {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d1db0dd800..437e093825 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -310,6 +310,14 @@ object SQLConf {
.stringConf
.createWithDefault("parquet")
+ val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
+ .internal()
+ .doc("When true, a table created by a Hive CTAS statement (no USING clause) " +
+ "without specifying any storage property will be converted to a data source table, " +
+ "using the data source set by spark.sql.sources.default.")
+ .booleanConf
+ .createWithDefault(false)
+
// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
@@ -632,6 +640,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
+ def convertCTAS: Boolean = getConf(CONVERT_CTAS)
+
def partitionDiscoveryEnabled(): Boolean =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ff395f39b7..f10afa75f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
+import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -446,53 +446,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
- case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
- val schema = if (table.schema.nonEmpty) {
- table.schema
+ case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) =>
+ val desc = if (table.storage.serde.isEmpty) {
+ // add default serde
+ table.withNewStorage(
+ serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
- child.output.map { a =>
- CatalogColumn(a.name, a.dataType.catalogString, a.nullable)
- }
+ table
}
- val desc = table.copy(schema = schema)
-
- if (sessionState.convertCTAS && table.storage.serde.isEmpty) {
- // Do the conversion when spark.sql.hive.convertCTAS is true and the query
- // does not specify any storage format (file format and storage handler).
- if (table.identifier.database.isDefined) {
- throw new AnalysisException(
- "Cannot specify database name in a CTAS statement " +
- "when spark.sql.hive.convertCTAS is set to true.")
- }
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
- val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
- CreateTableUsingAsSelect(
- TableIdentifier(desc.identifier.table),
- sessionState.conf.defaultDataSourceName,
- temporary = false,
- Array.empty[String],
- bucketSpec = None,
- mode,
- options = Map.empty[String, String],
- child
- )
- } else {
- val desc = if (table.storage.serde.isEmpty) {
- // add default serde
- table.withNewStorage(
- serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
- } else {
- table
- }
-
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
-
- execution.CreateTableAsSelectCommand(
- desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
- child,
- allowExisting)
- }
+ execution.CreateHiveTableAsSelectCommand(
+ desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
+ child,
+ allowExisting)
}
}
@@ -543,6 +511,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
/**
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
+ *
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 081d85acb9..ca8e5f8223 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -139,22 +139,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
}
/**
- * When true, a table created by a Hive CTAS statement (no USING clause) will be
- * converted to a data source table, using the data source set by spark.sql.sources.default.
- * The table in CTAS statement will be converted when it meets any of the following conditions:
- * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or
- * a Storage Handler (STORED BY), and the value of hive.default.fileformat in hive-site.xml
- * is either TextFile or SequenceFile.
- * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe
- * is specified (no ROW FORMAT SERDE clause).
- * - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
- * and no SerDe is specified (no ROW FORMAT SERDE clause).
- */
- def convertCTAS: Boolean = {
- conf.getConf(HiveUtils.CONVERT_CTAS)
- }
-
- /**
* When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
*/
def hiveThriftServerAsync: Boolean = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 88f4a2d2b2..9ed357c587 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -96,12 +96,6 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(false)
- val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
- .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
- "converted to a data source table, using the data source set by spark.sql.sources.default.")
- .booleanConf
- .createWithDefault(false)
-
val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
.doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
"the built in support.")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index cfe6149095..b8099385a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -26,16 +26,17 @@ import org.apache.spark.sql.hive.MetastoreRelation
/**
* Create table and insert the query result into it.
+ *
* @param tableDesc the Table Describe, which may contains serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
- * @param allowExisting allow continue working if it's already exists, otherwise
+ * @param ignoreIfExists allow continue working if it's already exists, otherwise
* raise exception
*/
private[hive]
-case class CreateTableAsSelectCommand(
+case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
- allowExisting: Boolean)
+ ignoreIfExists: Boolean)
extends RunnableCommand {
private val tableIdentifier = tableDesc.identifier
@@ -80,7 +81,7 @@ case class CreateTableAsSelectCommand(
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
- if (allowExisting) {
+ if (ignoreIfExists) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
throw new AnalysisException(s"$tableIdentifier already exists.")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 3297a6f6c3..ba9fe54db8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -36,7 +36,7 @@ class HiveDDLCommandSuite extends PlanTest {
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
case c: CreateTableCommand => (c.table, c.ifNotExists)
- case c: CreateTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
+ case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
case c: CreateViewCommand => (c.tableDesc, c.allowExisting)
}.head
}
@@ -58,7 +58,6 @@ class HiveDDLCommandSuite extends PlanTest {
|ip STRING COMMENT 'IP Address of the User',
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
- |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|STORED AS RCFILE
|LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
@@ -76,16 +75,12 @@ class HiveDDLCommandSuite extends PlanTest {
CatalogColumn("page_url", "string") ::
CatalogColumn("referrer_url", "string") ::
CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
- CatalogColumn("country", "string", comment = Some("country of origination")) ::
- CatalogColumn("dt", "string", comment = Some("date type")) ::
- CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+ CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
assert(desc.comment == Some("This is the staging page view table"))
// TODO will be SQLText
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
- assert(desc.partitionColumns ==
- CatalogColumn("dt", "string", comment = Some("date type")) ::
- CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+ assert(desc.partitionColumns == Seq.empty[CatalogColumn])
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==
@@ -103,7 +98,6 @@ class HiveDDLCommandSuite extends PlanTest {
|ip STRING COMMENT 'IP Address of the User',
|country STRING COMMENT 'country of origination')
|COMMENT 'This is the staging page view table'
- |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
|ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
| STORED AS
| INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
@@ -124,16 +118,12 @@ class HiveDDLCommandSuite extends PlanTest {
CatalogColumn("page_url", "string") ::
CatalogColumn("referrer_url", "string") ::
CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
- CatalogColumn("country", "string", comment = Some("country of origination")) ::
- CatalogColumn("dt", "string", comment = Some("date type")) ::
- CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+ CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
// TODO will be SQLText
assert(desc.comment == Some("This is the staging page view table"))
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
- assert(desc.partitionColumns ==
- CatalogColumn("dt", "string", comment = Some("date type")) ::
- CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+ assert(desc.partitionColumns == Seq.empty[CatalogColumn])
assert(desc.storage.serdeProperties == Map())
assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
@@ -195,6 +185,11 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
}
+ test("CTAS statement with a PARTITIONED BY clause is not allowed") {
+ assertUnsupported(s"CREATE TABLE ctas1 PARTITIONED BY (k int)" +
+ " AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp")
+ }
+
test("unsupported operations") {
intercept[ParseException] {
parser.parsePlan(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 0d08f7edc8..a43eed9a2a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -53,7 +53,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
"== Analyzed Logical Plan ==",
"== Optimized Logical Plan ==",
"== Physical Plan ==",
- "CreateTableAsSelect",
+ "CreateHiveTableAsSelect",
"InsertIntoHiveTable",
"Limit",
"src")
@@ -71,7 +71,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
"== Analyzed Logical Plan ==",
"== Optimized Logical Plan ==",
"== Physical Plan ==",
- "CreateTableAsSelect",
+ "CreateHiveTableAsSelect",
"InsertIntoHiveTable",
"Limit",
"src")
@@ -92,7 +92,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
val shouldContain =
"== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" ::
"== Optimized Logical Plan ==" :: "== Physical Plan ==" ::
- "CreateTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
+ "CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
for (key <- shouldContain) {
assert(outputs.contains(key), s"$key doesn't exist in result")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 2a9b06b75e..b5691450ca 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,11 +24,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@@ -376,78 +379,138 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
)
}
- test("CTAS without serde") {
- def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
- val relation = EliminateSubqueryAliases(
- sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
- relation match {
- case LogicalRelation(r: HadoopFsRelation, _, _) =>
- if (!isDataSourceParquet) {
- fail(
- s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
+ def checkRelation(
+ tableName: String,
+ isDataSourceParquet: Boolean,
+ format: String,
+ userSpecifiedLocation: Option[String] = None): Unit = {
+ val relation = EliminateSubqueryAliases(
+ sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
+ val catalogTable =
+ sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ relation match {
+ case LogicalRelation(r: HadoopFsRelation, _, _) =>
+ if (!isDataSourceParquet) {
+ fail(
+ s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
s"${HadoopFsRelation.getClass.getCanonicalName}.")
- }
+ }
+ userSpecifiedLocation match {
+ case Some(location) =>
+ assert(r.options("path") === location)
+ case None => // OK.
+ }
+ assert(
+ catalogTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) === format)
- case r: MetastoreRelation =>
- if (isDataSourceParquet) {
- fail(
- s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
+ case r: MetastoreRelation =>
+ if (isDataSourceParquet) {
+ fail(
+ s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
- }
- }
+ }
+ userSpecifiedLocation match {
+ case Some(location) =>
+ assert(r.catalogTable.storage.locationUri.get === location)
+ case None => // OK.
+ }
+ // Also make sure that the format is the desired format.
+ assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format))
}
- val originalConf = sessionState.convertCTAS
+ // When a user-specified location is defined, the table type needs to be EXTERNAL.
+ val actualTableType = catalogTable.tableType
+ userSpecifiedLocation match {
+ case Some(location) =>
+ assert(actualTableType === CatalogTableType.EXTERNAL)
+ case None =>
+ assert(actualTableType === CatalogTableType.MANAGED)
+ }
+ }
- setConf(HiveUtils.CONVERT_CTAS, true)
+ test("CTAS without serde without location") {
+ val originalConf = sessionState.conf.convertCTAS
+ setConf(SQLConf.CONVERT_CTAS, true)
+
+ val defaultDataSource = sessionState.conf.defaultDataSourceName
try {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
- var message = intercept[AnalysisException] {
+ val message = intercept[AnalysisException] {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
}.getMessage
assert(message.contains("already exists"))
- checkRelation("ctas1", true)
+ checkRelation("ctas1", true, defaultDataSource)
sql("DROP TABLE ctas1")
// Specifying database name for query can be converted to data source write path
// is not allowed right now.
- message = intercept[AnalysisException] {
- sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
- }.getMessage
- assert(
- message.contains("Cannot specify database name in a CTAS statement"),
- "When spark.sql.hive.convertCTAS is true, we should not allow " +
- "database name specified.")
+ sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", true, defaultDataSource)
+ sql("DROP TABLE ctas1")
sql("CREATE TABLE ctas1 stored as textfile" +
" AS SELECT key k, value FROM src ORDER BY k, value")
- checkRelation("ctas1", true)
+ checkRelation("ctas1", false, "text")
sql("DROP TABLE ctas1")
sql("CREATE TABLE ctas1 stored as sequencefile" +
" AS SELECT key k, value FROM src ORDER BY k, value")
- checkRelation("ctas1", true)
+ checkRelation("ctas1", false, "sequence")
sql("DROP TABLE ctas1")
sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src ORDER BY k, value")
- checkRelation("ctas1", false)
+ checkRelation("ctas1", false, "rcfile")
sql("DROP TABLE ctas1")
sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value")
- checkRelation("ctas1", false)
+ checkRelation("ctas1", false, "orc")
sql("DROP TABLE ctas1")
sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value")
- checkRelation("ctas1", false)
+ checkRelation("ctas1", false, "parquet")
sql("DROP TABLE ctas1")
} finally {
- setConf(HiveUtils.CONVERT_CTAS, originalConf)
+ setConf(SQLConf.CONVERT_CTAS, originalConf)
sql("DROP TABLE IF EXISTS ctas1")
}
}
+ test("CTAS without serde with location") {
+ withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
+ withTempDir { dir =>
+ val defaultDataSource = sessionState.conf.defaultDataSourceName
+
+ val tempLocation = dir.getCanonicalPath
+ sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c1'" +
+ " AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c1"))
+ sql("DROP TABLE ctas1")
+
+ sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c2'" +
+ " AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c2"))
+ sql("DROP TABLE ctas1")
+
+ sql(s"CREATE TABLE ctas1 stored as textfile LOCATION 'file:$tempLocation/c3'" +
+ " AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", false, "text", Some(s"file:$tempLocation/c3"))
+ sql("DROP TABLE ctas1")
+
+ sql(s"CREATE TABLE ctas1 stored as sequenceFile LOCATION 'file:$tempLocation/c4'" +
+ " AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", false, "sequence", Some(s"file:$tempLocation/c4"))
+ sql("DROP TABLE ctas1")
+
+ sql(s"CREATE TABLE ctas1 stored as rcfile LOCATION 'file:$tempLocation/c5'" +
+ " AS SELECT key k, value FROM src ORDER BY k, value")
+ checkRelation("ctas1", false, "rcfile", Some(s"file:$tempLocation/c5"))
+ sql("DROP TABLE ctas1")
+ }
+ }
+ }
+
test("CTAS with serde") {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
sql(
@@ -785,8 +848,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// generates an invalid query plan.
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
read.json(rdd).createOrReplaceTempView("data")
- val originalConf = sessionState.convertCTAS
- setConf(HiveUtils.CONVERT_CTAS, false)
+ val originalConf = sessionState.conf.convertCTAS
+ setConf(SQLConf.CONVERT_CTAS, false)
try {
sql("CREATE TABLE explodeTest (key bigInt)")
@@ -805,7 +868,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
sql("DROP TABLE explodeTest")
dropTempTable("data")
} finally {
- setConf(HiveUtils.CONVERT_CTAS, originalConf)
+ setConf(SQLConf.CONVERT_CTAS, originalConf)
}
}