aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-02 23:30:44 -0800
committerReynold Xin <rxin@databricks.com>2015-02-02 23:30:44 -0800
commit13531dd97c08563e53dacdaeaf1102bdd13ef825 (patch)
tree5ee942b4be335cca10381160241e4fd83dbadb42 /sql/hive/src
parent50a1a874e1d087a6c79835b1936d0009622a97b1 (diff)
downloadspark-13531dd97c08563e53dacdaeaf1102bdd13ef825.tar.gz
spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.tar.bz2
spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.zip
[SPARK-5501][SPARK-5420][SQL] Write support for the data source API
This PR aims to support `INSERT INTO/OVERWRITE TABLE tableName` and `CREATE TABLE tableName AS SELECT` for the data source API (partitioned tables are not supported). In this PR, I am also adding the support of `IF NOT EXISTS` for our ddl parser. The current semantic of `IF NOT EXISTS` is explained as follows. * For a `CREATE TEMPORARY TABLE` statement, it does not `IF NOT EXISTS` for now. * For a `CREATE TABLE` statement (we are creating a metastore table), if there is an existing table having the same name ... * when `IF NOT EXISTS` clause is used, we will do nothing. * when `IF NOT EXISTS` clause is not used, the user will see an exception saying the table already exists. TODOs: - [x] CTAS support - [x] Programmatic APIs - [ ] Python API (another PR) - [x] More unit tests - [ ] Documents (another PR) marmbrus liancheng rxin Author: Yin Huai <yhuai@databricks.com> Closes #4294 from yhuai/writeSupport and squashes the following commits: 3db1539 [Yin Huai] save does not take overwrite. 1c98881 [Yin Huai] Fix test. 142372a [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport 34e1bfb [Yin Huai] Address comments. 1682ca6 [Yin Huai] Better support for CTAS statements. e789d64 [Yin Huai] For the Scala API, let users to use tuples to provide options. 0128065 [Yin Huai] Short hand versions of save and load. 66ebd74 [Yin Huai] Formatting. 9203ec2 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport e5d29f2 [Yin Huai] Programmatic APIs. 1a719a5 [Yin Huai] CREATE TEMPORARY TABLE with IF NOT EXISTS is not allowed for now. 909924f [Yin Huai] Add saveAsTable for the data source API to DataFrame. 95a7c71 [Yin Huai] Fix bug when handling IF NOT EXISTS clause in a CREATE TEMPORARY TABLE statement. d37b19c [Yin Huai] Cheng's comments. fd6758c [Yin Huai] Use BeforeAndAfterAll. 7880891 [Yin Huai] Support CREATE TABLE AS SELECT STATEMENT and the IF NOT EXISTS clause. cb85b05 [Yin Huai] Initial write support. 2f91354 [Yin Huai] Make INSERT OVERWRITE/INTO statements consistent between HiveQL and SqlParser.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala67
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala35
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala76
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala185
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
6 files changed, 365 insertions, 18 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 5efc3b1e30..f6d9027f90 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
-import org.apache.spark.sql.sources.DataSourceStrategy
+import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy}
import org.apache.spark.sql.types._
/**
@@ -86,6 +86,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* @param allowExisting When false, an exception will be thrown if the table already exists.
* @tparam A A case class that is used to describe the schema of the table to be created.
*/
+ @Deprecated
def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}
@@ -106,6 +107,70 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.invalidateTable("default", tableName)
}
+ @Experimental
+ def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = {
+ val dataSourceName = conf.defaultDataSourceName
+ createTable(tableName, dataSourceName, allowExisting, ("path", path))
+ }
+
+ @Experimental
+ def createTable(
+ tableName: String,
+ dataSourceName: String,
+ allowExisting: Boolean,
+ option: (String, String),
+ options: (String, String)*): Unit = {
+ val cmd =
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema = None,
+ dataSourceName,
+ temporary = false,
+ (option +: options).toMap,
+ allowExisting)
+ executePlan(cmd).toRdd
+ }
+
+ @Experimental
+ def createTable(
+ tableName: String,
+ dataSourceName: String,
+ schema: StructType,
+ allowExisting: Boolean,
+ option: (String, String),
+ options: (String, String)*): Unit = {
+ val cmd =
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema = Some(schema),
+ dataSourceName,
+ temporary = false,
+ (option +: options).toMap,
+ allowExisting)
+ executePlan(cmd).toRdd
+ }
+
+ @Experimental
+ def createTable(
+ tableName: String,
+ dataSourceName: String,
+ allowExisting: Boolean,
+ options: java.util.Map[String, String]): Unit = {
+ val opts = options.toSeq
+ createTable(tableName, dataSourceName, allowExisting, opts.head, opts.tail:_*)
+ }
+
+ @Experimental
+ def createTable(
+ tableName: String,
+ dataSourceName: String,
+ schema: StructType,
+ allowExisting: Boolean,
+ options: java.util.Map[String, String]): Unit = {
+ val opts = options.toSeq
+ createTable(tableName, dataSourceName, schema, allowExisting, opts.head, opts.tail:_*)
+ }
+
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d910ee9509..48bea6c1bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -23,10 +23,9 @@ import java.util.{List => JList}
import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
-import org.apache.hadoop.hive.ql.metadata.InvalidTableException
+import org.apache.hadoop.hive.metastore.{Warehouse, TableType}
+import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, AlreadyExistsException, FieldSchema}
+import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
@@ -52,6 +51,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)
+ protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
+
// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String) {
@@ -99,11 +100,22 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val caseSensitive: Boolean = false
+ /** *
+ * Creates a data source table (a table created with USING clause) in Hive's metastore.
+ * Returns true when the table has been created. Otherwise, false.
+ * @param tableName
+ * @param userSpecifiedSchema
+ * @param provider
+ * @param options
+ * @param isExternal
+ * @return
+ */
def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
- options: Map[String, String]) = {
+ options: Map[String, String],
+ isExternal: Boolean): Unit = {
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
val tbl = new Table(dbName, tblName)
@@ -113,8 +125,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
- tbl.setProperty("EXTERNAL", "TRUE")
- tbl.setTableType(TableType.EXTERNAL_TABLE)
+ if (isExternal) {
+ tbl.setProperty("EXTERNAL", "TRUE")
+ tbl.setTableType(TableType.EXTERNAL_TABLE)
+ } else {
+ tbl.setProperty("EXTERNAL", "FALSE")
+ tbl.setTableType(TableType.MANAGED_TABLE)
+ }
// create the table
synchronized {
@@ -122,6 +139,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
+ def hiveDefaultTableFilePath(tableName: String): String = {
+ hiveWarehouse.getTablePath(client.getDatabaseCurrent, tableName).toString
+ }
+
def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index fa997288a2..d89111094b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.CreateTableUsing
+import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing}
import org.apache.spark.sql.types.StringType
@@ -212,9 +212,21 @@ private[hive] trait HiveStrategies {
object HiveDDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, opts, allowExisting) =>
ExecutedCommand(
- CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil
+ CreateMetastoreDataSource(
+ tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil
+
+ case CreateTableUsingAsSelect(tableName, provider, false, opts, allowExisting, query) =>
+ val logicalPlan = hiveContext.parseSql(query)
+ val cmd =
+ CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, logicalPlan)
+ ExecutedCommand(cmd) :: Nil
+
+ case CreateTableUsingAsLogicalPlan(tableName, provider, false, opts, allowExisting, query) =>
+ val cmd =
+ CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, query)
+ ExecutedCommand(cmd) :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 4814cb7ebf..95dcaccefd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,8 +18,10 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.ResolvedDataSource
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StructType
@@ -102,11 +104,77 @@ case class CreateMetastoreDataSource(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
- options: Map[String, String]) extends RunnableCommand {
+ options: Map[String, String],
+ allowExisting: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext) = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options)
+
+ if (hiveContext.catalog.tableExists(tableName :: Nil)) {
+ if (allowExisting) {
+ return Seq.empty[Row]
+ } else {
+ sys.error(s"Table $tableName already exists.")
+ }
+ }
+
+ var isExternal = true
+ val optionsWithPath =
+ if (!options.contains("path")) {
+ isExternal = false
+ options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName))
+ } else {
+ options
+ }
+
+ hiveContext.catalog.createDataSourceTable(
+ tableName,
+ userSpecifiedSchema,
+ provider,
+ optionsWithPath,
+ isExternal)
+
+ Seq.empty[Row]
+ }
+}
+
+case class CreateMetastoreDataSourceAsSelect(
+ tableName: String,
+ provider: String,
+ options: Map[String, String],
+ allowExisting: Boolean,
+ query: LogicalPlan) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
+
+ if (hiveContext.catalog.tableExists(tableName :: Nil)) {
+ if (allowExisting) {
+ return Seq.empty[Row]
+ } else {
+ sys.error(s"Table $tableName already exists.")
+ }
+ }
+
+ val df = DataFrame(hiveContext, query)
+ var isExternal = true
+ val optionsWithPath =
+ if (!options.contains("path")) {
+ isExternal = false
+ options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName))
+ } else {
+ options
+ }
+
+ // Create the relation based on the data of df.
+ ResolvedDataSource(sqlContext, provider, optionsWithPath, df)
+
+ hiveContext.catalog.createDataSourceTable(
+ tableName,
+ None,
+ provider,
+ optionsWithPath,
+ isExternal)
Seq.empty[Row]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7408c7ffd6..85795acb65 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -22,7 +22,9 @@ import java.io.File
import org.scalatest.BeforeAndAfterEach
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql._
import org.apache.spark.util.Utils
import org.apache.spark.sql.types._
@@ -36,9 +38,11 @@ import org.apache.spark.sql.hive.test.TestHive._
class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
override def afterEach(): Unit = {
reset()
+ if (ctasPath.exists()) Utils.deleteRecursively(ctasPath)
}
val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
+ var ctasPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile
test ("persistent JSON table") {
sql(
@@ -94,7 +98,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructField("<d>", innerStruct, true) ::
StructField("b", StringType, true) :: Nil)
- assert(expectedSchema == table("jsonTable").schema)
+ assert(expectedSchema === table("jsonTable").schema)
jsonFile(filePath).registerTempTable("expectedJsonTable")
@@ -137,6 +141,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
intercept[Exception] {
sql("SELECT * FROM jsonTable").collect()
}
+
+ assert(
+ (new File(filePath)).exists(),
+ "The table with specified path is considered as an external table, " +
+ "its data should not deleted after DROP TABLE.")
}
test("check change without refresh") {
@@ -240,7 +249,144 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
invalidateTable("jsonTable")
val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
- assert(expectedSchema == table("jsonTable").schema)
+ assert(expectedSchema === table("jsonTable").schema)
+ }
+
+ test("CTAS") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TABLE ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${ctasPath}'
+ |) AS
+ |SELECT * FROM jsonTable
+ """.stripMargin)
+
+ assert(table("ctasJsonTable").schema === table("jsonTable").schema)
+
+ checkAnswer(
+ sql("SELECT * FROM ctasJsonTable"),
+ sql("SELECT * FROM jsonTable").collect())
+ }
+
+ test("CTAS with IF NOT EXISTS") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TABLE ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${ctasPath}'
+ |) AS
+ |SELECT * FROM jsonTable
+ """.stripMargin)
+
+ // Create the table again should trigger a AlreadyExistsException.
+ val message = intercept[RuntimeException] {
+ sql(
+ s"""
+ |CREATE TABLE ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${ctasPath}'
+ |) AS
+ |SELECT * FROM jsonTable
+ """.stripMargin)
+ }.getMessage
+ assert(message.contains("Table ctasJsonTable already exists."),
+ "We should complain that ctasJsonTable already exists")
+
+ // The following statement should be fine if it has IF NOT EXISTS.
+ // It tries to create a table ctasJsonTable with a new schema.
+ // The actual table's schema and data should not be changed.
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${ctasPath}'
+ |) AS
+ |SELECT a FROM jsonTable
+ """.stripMargin)
+
+ // Discard the cached relation.
+ invalidateTable("ctasJsonTable")
+
+ // Schema should not be changed.
+ assert(table("ctasJsonTable").schema === table("jsonTable").schema)
+ // Table data should not be changed.
+ checkAnswer(
+ sql("SELECT * FROM ctasJsonTable"),
+ sql("SELECT * FROM jsonTable").collect())
+ }
+
+ test("CTAS a managed table") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ new Path("/Users/yhuai/Desktop/whatever")
+
+
+ val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+ val filesystemPath = new Path(expectedPath)
+ val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+ if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
+
+ // It is a managed table when we do not specify the location.
+ sql(
+ s"""
+ |CREATE TABLE ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ |
+ |) AS
+ |SELECT * FROM jsonTable
+ """.stripMargin)
+
+ assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.")
+
+ sql(
+ s"""
+ |CREATE TABLE loadedTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${expectedPath}'
+ |)
+ """.stripMargin)
+
+ assert(table("ctasJsonTable").schema === table("loadedTable").schema)
+
+ checkAnswer(
+ sql("SELECT * FROM ctasJsonTable"),
+ sql("SELECT * FROM loadedTable").collect()
+ )
+
+ sql("DROP TABLE ctasJsonTable")
+ assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.")
}
test("SPARK-5286 Fail to drop an invalid table when using the data source API") {
@@ -255,4 +401,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("DROP TABLE jsonTable").collect().foreach(println)
}
+
+ test("save and load table") {
+ val originalDefaultSource = conf.defaultDataSourceName
+ conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json")
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ val df = jsonRDD(rdd)
+
+ df.saveAsTable("savedJsonTable")
+
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable"),
+ df.collect())
+
+ createTable("createdJsonTable", catalog.hiveDefaultTableFilePath("savedJsonTable"), false)
+ assert(table("createdJsonTable").schema === df.schema)
+ checkAnswer(
+ sql("SELECT * FROM createdJsonTable"),
+ df.collect())
+
+ val message = intercept[RuntimeException] {
+ createTable("createdJsonTable", filePath.toString, false)
+ }.getMessage
+ assert(message.contains("Table createdJsonTable already exists."),
+ "We should complain that ctasJsonTable already exists")
+
+ createTable("createdJsonTable", filePath.toString, true)
+ // createdJsonTable should be not changed.
+ assert(table("createdJsonTable").schema === df.schema)
+ checkAnswer(
+ sql("SELECT * FROM createdJsonTable"),
+ df.collect())
+
+ conf.setConf("spark.sql.default.datasource", originalDefaultSource)
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index eb7a7750af..4efe0c5e0c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -170,7 +170,7 @@ class SQLQuerySuite extends QueryTest {
sql("CREATE TABLE test2 (key INT, value STRING)")
testData.insertInto("test2")
testData.insertInto("test2")
- sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test")
+ sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
checkAnswer(
table("test"),
sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)