diff options
author | Yin Huai <yhuai@databricks.com> | 2015-02-02 23:30:44 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-02-02 23:30:44 -0800 |
commit | 13531dd97c08563e53dacdaeaf1102bdd13ef825 (patch) | |
tree | 5ee942b4be335cca10381160241e4fd83dbadb42 /sql/hive/src | |
parent | 50a1a874e1d087a6c79835b1936d0009622a97b1 (diff) | |
download | spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.tar.gz spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.tar.bz2 spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.zip |
[SPARK-5501][SPARK-5420][SQL] Write support for the data source API
This PR aims to support `INSERT INTO/OVERWRITE TABLE tableName` and `CREATE TABLE tableName AS SELECT` for the data source API (partitioned tables are not supported).
In this PR, I am also adding the support of `IF NOT EXISTS` for our ddl parser. The current semantic of `IF NOT EXISTS` is explained as follows.
* For a `CREATE TEMPORARY TABLE` statement, it does not `IF NOT EXISTS` for now.
* For a `CREATE TABLE` statement (we are creating a metastore table), if there is an existing table having the same name ...
* when `IF NOT EXISTS` clause is used, we will do nothing.
* when `IF NOT EXISTS` clause is not used, the user will see an exception saying the table already exists.
TODOs:
- [x] CTAS support
- [x] Programmatic APIs
- [ ] Python API (another PR)
- [x] More unit tests
- [ ] Documents (another PR)
marmbrus liancheng rxin
Author: Yin Huai <yhuai@databricks.com>
Closes #4294 from yhuai/writeSupport and squashes the following commits:
3db1539 [Yin Huai] save does not take overwrite.
1c98881 [Yin Huai] Fix test.
142372a [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport
34e1bfb [Yin Huai] Address comments.
1682ca6 [Yin Huai] Better support for CTAS statements.
e789d64 [Yin Huai] For the Scala API, let users to use tuples to provide options.
0128065 [Yin Huai] Short hand versions of save and load.
66ebd74 [Yin Huai] Formatting.
9203ec2 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport
e5d29f2 [Yin Huai] Programmatic APIs.
1a719a5 [Yin Huai] CREATE TEMPORARY TABLE with IF NOT EXISTS is not allowed for now.
909924f [Yin Huai] Add saveAsTable for the data source API to DataFrame.
95a7c71 [Yin Huai] Fix bug when handling IF NOT EXISTS clause in a CREATE TEMPORARY TABLE statement.
d37b19c [Yin Huai] Cheng's comments.
fd6758c [Yin Huai] Use BeforeAndAfterAll.
7880891 [Yin Huai] Support CREATE TABLE AS SELECT STATEMENT and the IF NOT EXISTS clause.
cb85b05 [Yin Huai] Initial write support.
2f91354 [Yin Huai] Make INSERT OVERWRITE/INTO statements consistent between HiveQL and SqlParser.
Diffstat (limited to 'sql/hive/src')
6 files changed, 365 insertions, 18 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 5efc3b1e30..f6d9027f90 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} -import org.apache.spark.sql.sources.DataSourceStrategy +import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy} import org.apache.spark.sql.types._ /** @@ -86,6 +86,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * @param allowExisting When false, an exception will be thrown if the table already exists. * @tparam A A case class that is used to describe the schema of the table to be created. */ + @Deprecated def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } @@ -106,6 +107,70 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.invalidateTable("default", tableName) } + @Experimental + def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = { + val dataSourceName = conf.defaultDataSourceName + createTable(tableName, dataSourceName, allowExisting, ("path", path)) + } + + @Experimental + def createTable( + tableName: String, + dataSourceName: String, + allowExisting: Boolean, + option: (String, String), + options: (String, String)*): Unit = { + val cmd = + CreateTableUsing( + tableName, + userSpecifiedSchema = None, + dataSourceName, + temporary = false, + (option +: options).toMap, + allowExisting) + executePlan(cmd).toRdd + } + + @Experimental + def createTable( + tableName: String, + dataSourceName: String, + schema: StructType, + allowExisting: Boolean, + option: (String, String), + options: (String, String)*): Unit = { + val cmd = + CreateTableUsing( + tableName, + userSpecifiedSchema = Some(schema), + dataSourceName, + temporary = false, + (option +: options).toMap, + allowExisting) + executePlan(cmd).toRdd + } + + @Experimental + def createTable( + tableName: String, + dataSourceName: String, + allowExisting: Boolean, + options: java.util.Map[String, String]): Unit = { + val opts = options.toSeq + createTable(tableName, dataSourceName, allowExisting, opts.head, opts.tail:_*) + } + + @Experimental + def createTable( + tableName: String, + dataSourceName: String, + schema: StructType, + allowExisting: Boolean, + options: java.util.Map[String, String]): Unit = { + val opts = options.toSeq + createTable(tableName, dataSourceName, schema, allowExisting, opts.head, opts.tail:_*) + } + /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d910ee9509..48bea6c1bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -23,10 +23,9 @@ import java.util.{List => JList} import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder} import org.apache.hadoop.util.ReflectionUtils -import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema} -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} -import org.apache.hadoop.hive.ql.metadata.InvalidTableException +import org.apache.hadoop.hive.metastore.{Warehouse, TableType} +import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, AlreadyExistsException, FieldSchema} +import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} @@ -52,6 +51,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Connection to hive metastore. Usages should lock on `this`. */ protected[hive] val client = Hive.get(hive.hiveconf) + protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) + // TODO: Use this everywhere instead of tuples or databaseName, tableName,. /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) { @@ -99,11 +100,22 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false + /** * + * Creates a data source table (a table created with USING clause) in Hive's metastore. + * Returns true when the table has been created. Otherwise, false. + * @param tableName + * @param userSpecifiedSchema + * @param provider + * @param options + * @param isExternal + * @return + */ def createDataSourceTable( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String]) = { + options: Map[String, String], + isExternal: Boolean): Unit = { val (dbName, tblName) = processDatabaseAndTableName("default", tableName) val tbl = new Table(dbName, tblName) @@ -113,8 +125,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } - tbl.setProperty("EXTERNAL", "TRUE") - tbl.setTableType(TableType.EXTERNAL_TABLE) + if (isExternal) { + tbl.setProperty("EXTERNAL", "TRUE") + tbl.setTableType(TableType.EXTERNAL_TABLE) + } else { + tbl.setProperty("EXTERNAL", "FALSE") + tbl.setTableType(TableType.MANAGED_TABLE) + } // create the table synchronized { @@ -122,6 +139,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } + def hiveDefaultTableFilePath(tableName: String): String = { + hiveWarehouse.getTablePath(client.getDatabaseCurrent, tableName).toString + } + def tableExists(tableIdentifier: Seq[String]): Boolean = { val tableIdent = processTableIdentifier(tableIdentifier) val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index fa997288a2..d89111094b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.CreateTableUsing +import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing} import org.apache.spark.sql.types.StringType @@ -212,9 +212,21 @@ private[hive] trait HiveStrategies { object HiveDDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, opts, allowExisting) => ExecutedCommand( - CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil + CreateMetastoreDataSource( + tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil + + case CreateTableUsingAsSelect(tableName, provider, false, opts, allowExisting, query) => + val logicalPlan = hiveContext.parseSql(query) + val cmd = + CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, logicalPlan) + ExecutedCommand(cmd) :: Nil + + case CreateTableUsingAsLogicalPlan(tableName, provider, false, opts, allowExisting, query) => + val cmd = + CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, query) + ExecutedCommand(cmd) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 4814cb7ebf..95dcaccefd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.ResolvedDataSource +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.StructType @@ -102,11 +104,77 @@ case class CreateMetastoreDataSource( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String]) extends RunnableCommand { + options: Map[String, String], + allowExisting: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options) + + if (hiveContext.catalog.tableExists(tableName :: Nil)) { + if (allowExisting) { + return Seq.empty[Row] + } else { + sys.error(s"Table $tableName already exists.") + } + } + + var isExternal = true + val optionsWithPath = + if (!options.contains("path")) { + isExternal = false + options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) + } else { + options + } + + hiveContext.catalog.createDataSourceTable( + tableName, + userSpecifiedSchema, + provider, + optionsWithPath, + isExternal) + + Seq.empty[Row] + } +} + +case class CreateMetastoreDataSourceAsSelect( + tableName: String, + provider: String, + options: Map[String, String], + allowExisting: Boolean, + query: LogicalPlan) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + + if (hiveContext.catalog.tableExists(tableName :: Nil)) { + if (allowExisting) { + return Seq.empty[Row] + } else { + sys.error(s"Table $tableName already exists.") + } + } + + val df = DataFrame(hiveContext, query) + var isExternal = true + val optionsWithPath = + if (!options.contains("path")) { + isExternal = false + options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) + } else { + options + } + + // Create the relation based on the data of df. + ResolvedDataSource(sqlContext, provider, optionsWithPath, df) + + hiveContext.catalog.createDataSourceTable( + tableName, + None, + provider, + optionsWithPath, + isExternal) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 7408c7ffd6..85795acb65 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -22,7 +22,9 @@ import java.io.File import org.scalatest.BeforeAndAfterEach import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.util import org.apache.spark.sql._ import org.apache.spark.util.Utils import org.apache.spark.sql.types._ @@ -36,9 +38,11 @@ import org.apache.spark.sql.hive.test.TestHive._ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { override def afterEach(): Unit = { reset() + if (ctasPath.exists()) Utils.deleteRecursively(ctasPath) } val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + var ctasPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile test ("persistent JSON table") { sql( @@ -94,7 +98,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructField("<d>", innerStruct, true) :: StructField("b", StringType, true) :: Nil) - assert(expectedSchema == table("jsonTable").schema) + assert(expectedSchema === table("jsonTable").schema) jsonFile(filePath).registerTempTable("expectedJsonTable") @@ -137,6 +141,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { intercept[Exception] { sql("SELECT * FROM jsonTable").collect() } + + assert( + (new File(filePath)).exists(), + "The table with specified path is considered as an external table, " + + "its data should not deleted after DROP TABLE.") } test("check change without refresh") { @@ -240,7 +249,144 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { invalidateTable("jsonTable") val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) - assert(expectedSchema == table("jsonTable").schema) + assert(expectedSchema === table("jsonTable").schema) + } + + test("CTAS") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + assert(table("ctasJsonTable").schema === table("jsonTable").schema) + + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM jsonTable").collect()) + } + + test("CTAS with IF NOT EXISTS") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + // Create the table again should trigger a AlreadyExistsException. + val message = intercept[RuntimeException] { + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + }.getMessage + assert(message.contains("Table ctasJsonTable already exists."), + "We should complain that ctasJsonTable already exists") + + // The following statement should be fine if it has IF NOT EXISTS. + // It tries to create a table ctasJsonTable with a new schema. + // The actual table's schema and data should not be changed. + sql( + s""" + |CREATE TABLE IF NOT EXISTS ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT a FROM jsonTable + """.stripMargin) + + // Discard the cached relation. + invalidateTable("ctasJsonTable") + + // Schema should not be changed. + assert(table("ctasJsonTable").schema === table("jsonTable").schema) + // Table data should not be changed. + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM jsonTable").collect()) + } + + test("CTAS a managed table") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + new Path("/Users/yhuai/Desktop/whatever") + + + val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") + val filesystemPath = new Path(expectedPath) + val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) + if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) + + // It is a managed table when we do not specify the location. + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.") + + sql( + s""" + |CREATE TABLE loadedTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${expectedPath}' + |) + """.stripMargin) + + assert(table("ctasJsonTable").schema === table("loadedTable").schema) + + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM loadedTable").collect() + ) + + sql("DROP TABLE ctasJsonTable") + assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.") } test("SPARK-5286 Fail to drop an invalid table when using the data source API") { @@ -255,4 +401,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("DROP TABLE jsonTable").collect().foreach(println) } + + test("save and load table") { + val originalDefaultSource = conf.defaultDataSourceName + conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json") + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val df = jsonRDD(rdd) + + df.saveAsTable("savedJsonTable") + + checkAnswer( + sql("SELECT * FROM savedJsonTable"), + df.collect()) + + createTable("createdJsonTable", catalog.hiveDefaultTableFilePath("savedJsonTable"), false) + assert(table("createdJsonTable").schema === df.schema) + checkAnswer( + sql("SELECT * FROM createdJsonTable"), + df.collect()) + + val message = intercept[RuntimeException] { + createTable("createdJsonTable", filePath.toString, false) + }.getMessage + assert(message.contains("Table createdJsonTable already exists."), + "We should complain that ctasJsonTable already exists") + + createTable("createdJsonTable", filePath.toString, true) + // createdJsonTable should be not changed. + assert(table("createdJsonTable").schema === df.schema) + checkAnswer( + sql("SELECT * FROM createdJsonTable"), + df.collect()) + + conf.setConf("spark.sql.default.datasource", originalDefaultSource) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index eb7a7750af..4efe0c5e0c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -170,7 +170,7 @@ class SQLQuerySuite extends QueryTest { sql("CREATE TABLE test2 (key INT, value STRING)") testData.insertInto("test2") testData.insertInto("test2") - sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test") + sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") checkAnswer( table("test"), sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) |