diff options
author | Yin Huai <yhuai@databricks.com> | 2015-02-02 23:30:44 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-02-02 23:30:44 -0800 |
commit | 13531dd97c08563e53dacdaeaf1102bdd13ef825 (patch) | |
tree | 5ee942b4be335cca10381160241e4fd83dbadb42 | |
parent | 50a1a874e1d087a6c79835b1936d0009622a97b1 (diff) | |
download | spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.tar.gz spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.tar.bz2 spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.zip |
[SPARK-5501][SPARK-5420][SQL] Write support for the data source API
This PR aims to support `INSERT INTO/OVERWRITE TABLE tableName` and `CREATE TABLE tableName AS SELECT` for the data source API (partitioned tables are not supported).
In this PR, I am also adding the support of `IF NOT EXISTS` for our ddl parser. The current semantic of `IF NOT EXISTS` is explained as follows.
* For a `CREATE TEMPORARY TABLE` statement, it does not `IF NOT EXISTS` for now.
* For a `CREATE TABLE` statement (we are creating a metastore table), if there is an existing table having the same name ...
* when `IF NOT EXISTS` clause is used, we will do nothing.
* when `IF NOT EXISTS` clause is not used, the user will see an exception saying the table already exists.
TODOs:
- [x] CTAS support
- [x] Programmatic APIs
- [ ] Python API (another PR)
- [x] More unit tests
- [ ] Documents (another PR)
marmbrus liancheng rxin
Author: Yin Huai <yhuai@databricks.com>
Closes #4294 from yhuai/writeSupport and squashes the following commits:
3db1539 [Yin Huai] save does not take overwrite.
1c98881 [Yin Huai] Fix test.
142372a [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport
34e1bfb [Yin Huai] Address comments.
1682ca6 [Yin Huai] Better support for CTAS statements.
e789d64 [Yin Huai] For the Scala API, let users to use tuples to provide options.
0128065 [Yin Huai] Short hand versions of save and load.
66ebd74 [Yin Huai] Formatting.
9203ec2 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport
e5d29f2 [Yin Huai] Programmatic APIs.
1a719a5 [Yin Huai] CREATE TEMPORARY TABLE with IF NOT EXISTS is not allowed for now.
909924f [Yin Huai] Add saveAsTable for the data source API to DataFrame.
95a7c71 [Yin Huai] Fix bug when handling IF NOT EXISTS clause in a CREATE TEMPORARY TABLE statement.
d37b19c [Yin Huai] Cheng's comments.
fd6758c [Yin Huai] Use BeforeAndAfterAll.
7880891 [Yin Huai] Support CREATE TABLE AS SELECT STATEMENT and the IF NOT EXISTS clause.
cb85b05 [Yin Huai] Initial write support.
2f91354 [Yin Huai] Make INSERT OVERWRITE/INTO statements consistent between HiveQL and SqlParser.
23 files changed, 1141 insertions, 60 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 594a423146..25e639d390 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -147,8 +147,8 @@ class SqlParser extends AbstractSparkSQLParser { } protected lazy val insert: Parser[LogicalPlan] = - INSERT ~> OVERWRITE.? ~ (INTO ~> relation) ~ select ^^ { - case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o.isDefined) + INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ { + case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o) } protected lazy val projection: Parser[Expression] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 385e1ec74f..4cbfb6af5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import java.util.{List => JList} - import scala.reflect.ClassTag import org.apache.spark.annotation.{DeveloperApi, Experimental} @@ -487,6 +485,53 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { /** * :: Experimental :: + * Creates a table from the the contents of this DataFrame based on a given data source and + * a set of options. This will fail if the table already exists. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + */ + @Experimental + override def saveAsTable( + tableName: String, + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit + + /** + * :: Experimental :: + * Creates a table from the the contents of this DataFrame based on a given data source and + * a set of options. This will fail if the table already exists. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + */ + @Experimental + override def saveAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit + + @Experimental + override def save(path: String): Unit + + @Experimental + override def save( + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit + + @Experimental + override def save( + dataSourceName: String, + options: java.util.Map[String, String]): Unit + + /** + * :: Experimental :: * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. */ @Experimental diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index f8fcc25569..f84dbf32fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython} import org.apache.spark.sql.json.JsonRDD +import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan} import org.apache.spark.sql.types.{NumericType, StructType} import org.apache.spark.util.Utils @@ -303,8 +304,61 @@ private[sql] class DataFrameImpl protected[sql]( } override def saveAsTable(tableName: String): Unit = { - sqlContext.executePlan( - CreateTableAsSelect(None, tableName, logicalPlan, allowExisting = false)).toRdd + val dataSourceName = sqlContext.conf.defaultDataSourceName + val cmd = + CreateTableUsingAsLogicalPlan( + tableName, + dataSourceName, + temporary = false, + Map.empty, + allowExisting = false, + logicalPlan) + + sqlContext.executePlan(cmd).toRdd + } + + override def saveAsTable( + tableName: String, + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit = { + val cmd = + CreateTableUsingAsLogicalPlan( + tableName, + dataSourceName, + temporary = false, + (option +: options).toMap, + allowExisting = false, + logicalPlan) + + sqlContext.executePlan(cmd).toRdd + } + + override def saveAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit = { + val opts = options.toSeq + saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*) + } + + override def save(path: String): Unit = { + val dataSourceName = sqlContext.conf.defaultDataSourceName + save(dataSourceName, ("path" -> path)) + } + + override def save( + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit = { + ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this) + } + + override def save( + dataSourceName: String, + options: java.util.Map[String, String]): Unit = { + val opts = options.toSeq + save(dataSourceName, opts.head, opts.tail:_*) } override def insertInto(tableName: String, overwrite: Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 2f8c695d56..9b051de68f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -152,6 +152,28 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten override def saveAsTable(tableName: String): Unit = err() + override def saveAsTable( + tableName: String, + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit = err() + + override def saveAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit = err() + + override def save(path: String): Unit = err() + + override def save( + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit = err() + + override def save( + dataSourceName: String, + options: java.util.Map[String, String]): Unit = err() + override def insertInto(tableName: String, overwrite: Boolean): Unit = err() override def toJSON: RDD[String] = err() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 243dc99707..561a91d2d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -47,6 +47,9 @@ private[spark] object SQLConf { // This is only used for the thriftserver val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" + // This is used to set the default data source + val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource" + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -155,6 +158,9 @@ private[sql] class SQLConf extends Serializable { private[spark] def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt + private[spark] def defaultDataSourceName: String = + getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet") + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f4692b3ff5..a741d0031d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -21,6 +21,7 @@ import java.beans.Introspector import java.util.Properties import scala.collection.immutable +import scala.collection.JavaConversions._ import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -37,7 +38,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.json._ import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} -import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy} +import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -335,6 +336,29 @@ class SQLContext(@transient val sparkContext: SparkContext) applySchema(rowRDD, appliedSchema) } + @Experimental + def load(path: String): DataFrame = { + val dataSourceName = conf.defaultDataSourceName + load(dataSourceName, ("path", path)) + } + + @Experimental + def load( + dataSourceName: String, + option: (String, String), + options: (String, String)*): DataFrame = { + val resolved = ResolvedDataSource(this, None, dataSourceName, (option +: options).toMap) + DataFrame(this, LogicalRelation(resolved.relation)) + } + + @Experimental + def load( + dataSourceName: String, + options: java.util.Map[String, String]): DataFrame = { + val opts = options.toSeq + load(dataSourceName, opts.head, opts.tail:_*) + } + /** * :: Experimental :: * Construct an RDD representing the database table accessible via JDBC URL diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index eb0eb3f325..c4a00cdb20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -171,6 +171,33 @@ private[sql] trait DataFrameSpecificApi { def saveAsTable(tableName: String): Unit @Experimental + def saveAsTable( + tableName: String, + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit + + @Experimental + def saveAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit + + @Experimental + def save(path: String): Unit + + @Experimental + def save( + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit + + @Experimental + def save( + dataSourceName: String, + options: java.util.Map[String, String]): Unit + + @Experimental def insertInto(tableName: String, overwrite: Boolean): Unit @Experimental diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0cc9d049c9..ff0609d4b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.parquet._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing} +import org.apache.spark.sql.sources._ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { @@ -314,12 +314,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false) => ExecutedCommand( - CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil - - case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => + CreateTempTableUsing( + tableName, userSpecifiedSchema, provider, opts)) :: Nil + case c: CreateTableUsing if !c.temporary => + sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case c: CreateTableUsing if c.temporary && c.allowExisting => + sys.error("allowExisting should be set to false when creating a temporary table.") + + case CreateTableUsingAsSelect(tableName, provider, true, opts, false, query) => + val logicalPlan = sqlContext.parseSql(query) + val cmd = + CreateTempTableUsingAsSelect(tableName, provider, opts, logicalPlan) + ExecutedCommand(cmd) :: Nil + case c: CreateTableUsingAsSelect if !c.temporary => + sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case c: CreateTableUsingAsSelect if c.temporary && c.allowExisting => + sys.error("allowExisting should be set to false when creating a temporary table.") + + case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, false, query) => + val cmd = + CreateTempTableUsingAsSelect(tableName, provider, opts, query) + ExecutedCommand(cmd) :: Nil + case c: CreateTableUsingAsLogicalPlan if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting => + sys.error("allowExisting should be set to false when creating a temporary table.") case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 1af96c28d5..8372decbf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -17,21 +17,26 @@ package org.apache.spark.sql.json -import org.apache.spark.sql.SQLContext +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider { +private[sql] class DefaultSource + extends RelationProvider with SchemaRelationProvider with CreateableRelationProvider { /** Returns a new base relation with the parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) + val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(fileName, samplingRatio, None)(sqlContext) + JSONRelation(path, samplingRatio, None)(sqlContext) } /** Returns a new base relation with the given schema and parameters. */ @@ -39,21 +44,37 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = { - val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) + val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(fileName, samplingRatio, Some(schema))(sqlContext) + JSONRelation(path, samplingRatio, Some(schema))(sqlContext) + } + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) + val filesystemPath = new Path(path) + val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + if (fs.exists(filesystemPath)) { + sys.error(s"path $path already exists.") + } + data.toJSON.saveAsTextFile(path) + + createRelation(sqlContext, parameters, data.schema) } } private[sql] case class JSONRelation( - fileName: String, + path: String, samplingRatio: Double, userSpecifiedSchema: Option[StructType])( @transient val sqlContext: SQLContext) - extends TableScan { + extends TableScan with InsertableRelation { - private def baseRDD = sqlContext.sparkContext.textFile(fileName) + // TODO: Support partitioned JSON relation. + private def baseRDD = sqlContext.sparkContext.textFile(path) override val schema = userSpecifiedSchema.getOrElse( JsonRDD.nullTypeToStringType( @@ -64,4 +85,24 @@ private[sql] case class JSONRelation( override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) + + override def insert(data: DataFrame, overwrite: Boolean) = { + val filesystemPath = new Path(path) + val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + + if (overwrite) { + try { + fs.delete(filesystemPath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") + } + data.toJSON.saveAsTextFile(path) + } else { + // TODO: Support INSERT INTO + sys.error("JSON table only support INSERT OVERWRITE for now.") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index d13f2ce2a5..386ff2452f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.{Row, Strategy} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, InsertIntoTable => LogicalInsertIntoTable} import org.apache.spark.sql.execution /** @@ -54,6 +54,13 @@ private[sql] object DataSourceStrategy extends Strategy { case l @ LogicalRelation(t: TableScan) => execution.PhysicalRDD(l.output, t.buildScan()) :: Nil + case i @ LogicalInsertIntoTable( + l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) => + if (partition.nonEmpty) { + sys.error(s"Insert into a partition is not allowed because $l is not partitioned.") + } + execution.ExecutedCommand(InsertIntoRelation(t, query, overwrite)) :: Nil + case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala new file mode 100644 index 0000000000..d7942dc309 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.RunnableCommand + +private[sql] case class InsertIntoRelation( + relation: InsertableRelation, + query: LogicalPlan, + overwrite: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + relation.insert(DataFrame(sqlContext, query), overwrite) + + Seq.empty[Row] + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index b1bbe0f89a..ead827728c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -36,6 +36,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { try { Some(apply(input)) } catch { + case ddlException: DDLException => throw ddlException case _ if !exceptionOnError => None case x: Throwable => throw x } @@ -45,8 +46,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { lexical.initialize(reservedWords) phrase(dataType)(new lexical.Scanner(input)) match { case Success(r, x) => r - case x => - sys.error(s"Unsupported dataType: $x") + case x => throw new DDLException(s"Unsupported dataType: $x") } } @@ -56,8 +56,12 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val CREATE = Keyword("CREATE") protected val TEMPORARY = Keyword("TEMPORARY") protected val TABLE = Keyword("TABLE") + protected val IF = Keyword("IF") + protected val NOT = Keyword("NOT") + protected val EXISTS = Keyword("EXISTS") protected val USING = Keyword("USING") protected val OPTIONS = Keyword("OPTIONS") + protected val AS = Keyword("AS") protected val COMMENT = Keyword("COMMENT") // Data types. @@ -83,22 +87,51 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected def start: Parser[LogicalPlan] = ddl /** - * `CREATE [TEMPORARY] TABLE avroTable + * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS] * USING org.apache.spark.sql.avro * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` * or - * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) + * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS] * USING org.apache.spark.sql.avro * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * or + * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS] + * USING org.apache.spark.sql.avro + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * AS SELECT ... */ protected lazy val createTable: Parser[LogicalPlan] = ( - (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident - ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { - case temp ~ tableName ~ columns ~ provider ~ opts => - val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) - CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts) - } + (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident + ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ~ (AS ~> restInput).? ^^ { + case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => + if (temp.isDefined && allowExisting.isDefined) { + throw new DDLException( + "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") + } + + if (query.isDefined) { + if (columns.isDefined) { + throw new DDLException( + "a CREATE TABLE AS SELECT statement does not allow column definitions.") + } + CreateTableUsingAsSelect(tableName, + provider, + temp.isDefined, + opts, + allowExisting.isDefined, + query.get) + } else { + val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) + CreateTableUsing( + tableName, + userSpecifiedSchema, + provider, + temp.isDefined, + opts, + allowExisting.isDefined) + } + } ) protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" @@ -193,7 +226,7 @@ object ResolvedDataSource { dataSource .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options), schema) - case _ => + case dataSource: org.apache.spark.sql.sources.RelationProvider => sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.") } } @@ -203,7 +236,7 @@ object ResolvedDataSource { dataSource .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options)) - case _ => + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.") } } @@ -211,6 +244,32 @@ object ResolvedDataSource { new ResolvedDataSource(clazz, relation) } + + def apply( + sqlContext: SQLContext, + provider: String, + options: Map[String, String], + data: DataFrame): ResolvedDataSource = { + val loader = Utils.getContextOrSparkClassLoader + val clazz: Class[_] = try loader.loadClass(provider) catch { + case cnf: java.lang.ClassNotFoundException => + try loader.loadClass(provider + ".DefaultSource") catch { + case cnf: java.lang.ClassNotFoundException => + sys.error(s"Failed to load class for data source: $provider") + } + } + + val relation = clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.CreateableRelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.CreateableRelationProvider] + .createRelation(sqlContext, options, data) + case _ => + sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") + } + + new ResolvedDataSource(clazz, relation) + } } private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) @@ -220,13 +279,30 @@ private[sql] case class CreateTableUsing( userSpecifiedSchema: Option[StructType], provider: String, temporary: Boolean, - options: Map[String, String]) extends Command + options: Map[String, String], + allowExisting: Boolean) extends Command + +private[sql] case class CreateTableUsingAsSelect( + tableName: String, + provider: String, + temporary: Boolean, + options: Map[String, String], + allowExisting: Boolean, + query: String) extends Command + +private[sql] case class CreateTableUsingAsLogicalPlan( + tableName: String, + provider: String, + temporary: Boolean, + options: Map[String, String], + allowExisting: Boolean, + query: LogicalPlan) extends Command private [sql] case class CreateTempTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String]) extends RunnableCommand { + options: Map[String, String]) extends RunnableCommand { def run(sqlContext: SQLContext) = { val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) @@ -236,6 +312,22 @@ private [sql] case class CreateTempTableUsing( } } +private [sql] case class CreateTempTableUsingAsSelect( + tableName: String, + provider: String, + options: Map[String, String], + query: LogicalPlan) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + val df = DataFrame(sqlContext, query) + val resolved = ResolvedDataSource(sqlContext, provider, options, df) + sqlContext.registerRDDAsTable( + DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + + Seq.empty + } +} + /** * Builds a map in which keys are case insensitive */ @@ -253,3 +345,9 @@ protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, override def -(key: String): Map[String, String] = baseMap - key.toLowerCase() } + +/** + * The exception thrown from the DDL parser. + * @param message + */ +protected[sql] class DDLException(message: String) extends Exception(message) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index cd82cc6ecb..ad0a35b91e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute} import org.apache.spark.sql.types.StructType @@ -77,6 +77,14 @@ trait SchemaRelationProvider { schema: StructType): BaseRelation } +@DeveloperApi +trait CreateableRelationProvider { + def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + data: DataFrame): BaseRelation +} + /** * ::DeveloperApi:: * Represents a collection of tuples with a known schema. Classes that extend BaseRelation must @@ -108,7 +116,7 @@ abstract class BaseRelation { * A BaseRelation that can produce all of its tuples as an RDD of Row objects. */ @DeveloperApi -abstract class TableScan extends BaseRelation { +trait TableScan extends BaseRelation { def buildScan(): RDD[Row] } @@ -118,7 +126,7 @@ abstract class TableScan extends BaseRelation { * containing all of its tuples as Row objects. */ @DeveloperApi -abstract class PrunedScan extends BaseRelation { +trait PrunedScan extends BaseRelation { def buildScan(requiredColumns: Array[String]): RDD[Row] } @@ -132,7 +140,7 @@ abstract class PrunedScan extends BaseRelation { * as filtering partitions based on a bloom filter. */ @DeveloperApi -abstract class PrunedFilteredScan extends BaseRelation { +trait PrunedFilteredScan extends BaseRelation { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] } @@ -145,6 +153,11 @@ abstract class PrunedFilteredScan extends BaseRelation { * for experimentation. */ @Experimental -abstract class CatalystScan extends BaseRelation { +trait CatalystScan extends BaseRelation { def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] } + +@DeveloperApi +trait InsertableRelation extends BaseRelation { + def insert(data: DataFrame, overwrite: Boolean): Unit +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 3d82f4bce7..5ec7a156d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -37,11 +37,21 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { test("appending") { val data = (0 until 10).map(i => (i, i.toString)) withParquetTable(data, "t") { - sql("INSERT INTO t SELECT * FROM t") + sql("INSERT INTO TABLE t SELECT * FROM t") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } } + // This test case will trigger the NPE mentioned in + // https://issues.apache.org/jira/browse/PARQUET-151. + ignore("overwriting") { + val data = (0 until 10).map(i => (i, i.toString)) + withParquetTable(data, "t") { + sql("INSERT OVERWRITE TABLE t SELECT * FROM t") + checkAnswer(table("t"), data.map(Row.fromTuple)) + } + } + test("self-join") { // 4 rows, cells of column 1 of row 2 and row 4 are null val data = (1 to 4).map { i => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala new file mode 100644 index 0000000000..b02389978b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.catalyst.util +import org.apache.spark.util.Utils + +class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { + + import caseInsensisitiveContext._ + + var path: File = null + + override def beforeAll(): Unit = { + path = util.getTempFilePath("jsonCTAS").getCanonicalFile + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + } + + override def afterAll(): Unit = { + dropTempTable("jt") + } + + after { + if (path.exists()) Utils.deleteRecursively(path) + } + + test("CREATE TEMPORARY TABLE AS SELECT") { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + sql("SELECT a, b FROM jt").collect()) + + dropTempTable("jsonTable") + } + + test("create a table, drop it and create another one with the same name") { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + sql("SELECT a, b FROM jt").collect()) + + dropTempTable("jsonTable") + + val message = intercept[RuntimeException]{ + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a * 4 FROM jt + """.stripMargin) + }.getMessage + assert( + message.contains(s"path ${path.toString} already exists."), + "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.") + + // Explicitly delete it. + if (path.exists()) Utils.deleteRecursively(path) + + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a * 4 FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT a * 4 FROM jt").collect()) + + dropTempTable("jsonTable") + } + + test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") { + val message = intercept[DDLException]{ + sql( + s""" + |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT b FROM jt + """.stripMargin) + }.getMessage + assert( + message.contains("a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause."), + "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.") + } + + test("a CTAS statement with column definitions is not allowed") { + intercept[DDLException]{ + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable (a int, b string) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala new file mode 100644 index 0000000000..f91cea6a37 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util +import org.apache.spark.util.Utils + +class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll { + + import caseInsensisitiveContext._ + + var path: File = null + + override def beforeAll: Unit = { + path = util.getTempFilePath("jsonCTAS").getCanonicalFile + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable (a int, b string) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) + """.stripMargin) + } + + override def afterAll: Unit = { + dropTempTable("jsonTable") + dropTempTable("jt") + if (path.exists()) Utils.deleteRecursively(path) + } + + test("Simple INSERT OVERWRITE a JSONRelation") { + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + (1 to 10).map(i => Row(i, s"str$i")) + ) + } + + test("INSERT OVERWRITE a JSONRelation multiple times") { + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + (1 to 10).map(i => Row(i, s"str$i")) + ) + } + + test("INSERT INTO not supported for JSONRelation for now") { + intercept[RuntimeException]{ + sql( + s""" + |INSERT INTO TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala new file mode 100644 index 0000000000..fe2f76cc39 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.DataFrame +import org.apache.spark.util.Utils + +import org.apache.spark.sql.catalyst.util + +class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll { + + import caseInsensisitiveContext._ + + var originalDefaultSource: String = null + + var path: File = null + + var df: DataFrame = null + + override def beforeAll(): Unit = { + originalDefaultSource = conf.defaultDataSourceName + conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json") + + path = util.getTempFilePath("datasource").getCanonicalFile + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + df = jsonRDD(rdd) + } + + override def afterAll(): Unit = { + conf.setConf("spark.sql.default.datasource", originalDefaultSource) + } + + after { + if (path.exists()) Utils.deleteRecursively(path) + } + + def checkLoad(): Unit = { + checkAnswer(load(path.toString), df.collect()) + checkAnswer(load("org.apache.spark.sql.json", ("path", path.toString)), df.collect()) + } + + test("save with overwrite and load") { + df.save(path.toString) + checkLoad + } + + test("save with data source and options, and load") { + df.save("org.apache.spark.sql.json", ("path", path.toString)) + checkLoad + } + + test("save and save again") { + df.save(path.toString) + + val message = intercept[RuntimeException] { + df.save(path.toString) + }.getMessage + + assert( + message.contains("already exists"), + "We should complain that the path already exists.") + + if (path.exists()) Utils.deleteRecursively(path) + + df.save(path.toString) + checkLoad + } +}
\ No newline at end of file diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 5efc3b1e30..f6d9027f90 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} -import org.apache.spark.sql.sources.DataSourceStrategy +import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy} import org.apache.spark.sql.types._ /** @@ -86,6 +86,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * @param allowExisting When false, an exception will be thrown if the table already exists. * @tparam A A case class that is used to describe the schema of the table to be created. */ + @Deprecated def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } @@ -106,6 +107,70 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.invalidateTable("default", tableName) } + @Experimental + def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = { + val dataSourceName = conf.defaultDataSourceName + createTable(tableName, dataSourceName, allowExisting, ("path", path)) + } + + @Experimental + def createTable( + tableName: String, + dataSourceName: String, + allowExisting: Boolean, + option: (String, String), + options: (String, String)*): Unit = { + val cmd = + CreateTableUsing( + tableName, + userSpecifiedSchema = None, + dataSourceName, + temporary = false, + (option +: options).toMap, + allowExisting) + executePlan(cmd).toRdd + } + + @Experimental + def createTable( + tableName: String, + dataSourceName: String, + schema: StructType, + allowExisting: Boolean, + option: (String, String), + options: (String, String)*): Unit = { + val cmd = + CreateTableUsing( + tableName, + userSpecifiedSchema = Some(schema), + dataSourceName, + temporary = false, + (option +: options).toMap, + allowExisting) + executePlan(cmd).toRdd + } + + @Experimental + def createTable( + tableName: String, + dataSourceName: String, + allowExisting: Boolean, + options: java.util.Map[String, String]): Unit = { + val opts = options.toSeq + createTable(tableName, dataSourceName, allowExisting, opts.head, opts.tail:_*) + } + + @Experimental + def createTable( + tableName: String, + dataSourceName: String, + schema: StructType, + allowExisting: Boolean, + options: java.util.Map[String, String]): Unit = { + val opts = options.toSeq + createTable(tableName, dataSourceName, schema, allowExisting, opts.head, opts.tail:_*) + } + /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d910ee9509..48bea6c1bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -23,10 +23,9 @@ import java.util.{List => JList} import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder} import org.apache.hadoop.util.ReflectionUtils -import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema} -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} -import org.apache.hadoop.hive.ql.metadata.InvalidTableException +import org.apache.hadoop.hive.metastore.{Warehouse, TableType} +import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, AlreadyExistsException, FieldSchema} +import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} @@ -52,6 +51,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Connection to hive metastore. Usages should lock on `this`. */ protected[hive] val client = Hive.get(hive.hiveconf) + protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) + // TODO: Use this everywhere instead of tuples or databaseName, tableName,. /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) { @@ -99,11 +100,22 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false + /** * + * Creates a data source table (a table created with USING clause) in Hive's metastore. + * Returns true when the table has been created. Otherwise, false. + * @param tableName + * @param userSpecifiedSchema + * @param provider + * @param options + * @param isExternal + * @return + */ def createDataSourceTable( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String]) = { + options: Map[String, String], + isExternal: Boolean): Unit = { val (dbName, tblName) = processDatabaseAndTableName("default", tableName) val tbl = new Table(dbName, tblName) @@ -113,8 +125,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } - tbl.setProperty("EXTERNAL", "TRUE") - tbl.setTableType(TableType.EXTERNAL_TABLE) + if (isExternal) { + tbl.setProperty("EXTERNAL", "TRUE") + tbl.setTableType(TableType.EXTERNAL_TABLE) + } else { + tbl.setProperty("EXTERNAL", "FALSE") + tbl.setTableType(TableType.MANAGED_TABLE) + } // create the table synchronized { @@ -122,6 +139,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } + def hiveDefaultTableFilePath(tableName: String): String = { + hiveWarehouse.getTablePath(client.getDatabaseCurrent, tableName).toString + } + def tableExists(tableIdentifier: Seq[String]): Boolean = { val tableIdent = processTableIdentifier(tableIdentifier) val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index fa997288a2..d89111094b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.CreateTableUsing +import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing} import org.apache.spark.sql.types.StringType @@ -212,9 +212,21 @@ private[hive] trait HiveStrategies { object HiveDDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, opts, allowExisting) => ExecutedCommand( - CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil + CreateMetastoreDataSource( + tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil + + case CreateTableUsingAsSelect(tableName, provider, false, opts, allowExisting, query) => + val logicalPlan = hiveContext.parseSql(query) + val cmd = + CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, logicalPlan) + ExecutedCommand(cmd) :: Nil + + case CreateTableUsingAsLogicalPlan(tableName, provider, false, opts, allowExisting, query) => + val cmd = + CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, query) + ExecutedCommand(cmd) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 4814cb7ebf..95dcaccefd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.ResolvedDataSource +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.StructType @@ -102,11 +104,77 @@ case class CreateMetastoreDataSource( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String]) extends RunnableCommand { + options: Map[String, String], + allowExisting: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options) + + if (hiveContext.catalog.tableExists(tableName :: Nil)) { + if (allowExisting) { + return Seq.empty[Row] + } else { + sys.error(s"Table $tableName already exists.") + } + } + + var isExternal = true + val optionsWithPath = + if (!options.contains("path")) { + isExternal = false + options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) + } else { + options + } + + hiveContext.catalog.createDataSourceTable( + tableName, + userSpecifiedSchema, + provider, + optionsWithPath, + isExternal) + + Seq.empty[Row] + } +} + +case class CreateMetastoreDataSourceAsSelect( + tableName: String, + provider: String, + options: Map[String, String], + allowExisting: Boolean, + query: LogicalPlan) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + + if (hiveContext.catalog.tableExists(tableName :: Nil)) { + if (allowExisting) { + return Seq.empty[Row] + } else { + sys.error(s"Table $tableName already exists.") + } + } + + val df = DataFrame(hiveContext, query) + var isExternal = true + val optionsWithPath = + if (!options.contains("path")) { + isExternal = false + options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) + } else { + options + } + + // Create the relation based on the data of df. + ResolvedDataSource(sqlContext, provider, optionsWithPath, df) + + hiveContext.catalog.createDataSourceTable( + tableName, + None, + provider, + optionsWithPath, + isExternal) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 7408c7ffd6..85795acb65 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -22,7 +22,9 @@ import java.io.File import org.scalatest.BeforeAndAfterEach import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.util import org.apache.spark.sql._ import org.apache.spark.util.Utils import org.apache.spark.sql.types._ @@ -36,9 +38,11 @@ import org.apache.spark.sql.hive.test.TestHive._ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { override def afterEach(): Unit = { reset() + if (ctasPath.exists()) Utils.deleteRecursively(ctasPath) } val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + var ctasPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile test ("persistent JSON table") { sql( @@ -94,7 +98,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructField("<d>", innerStruct, true) :: StructField("b", StringType, true) :: Nil) - assert(expectedSchema == table("jsonTable").schema) + assert(expectedSchema === table("jsonTable").schema) jsonFile(filePath).registerTempTable("expectedJsonTable") @@ -137,6 +141,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { intercept[Exception] { sql("SELECT * FROM jsonTable").collect() } + + assert( + (new File(filePath)).exists(), + "The table with specified path is considered as an external table, " + + "its data should not deleted after DROP TABLE.") } test("check change without refresh") { @@ -240,7 +249,144 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { invalidateTable("jsonTable") val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) - assert(expectedSchema == table("jsonTable").schema) + assert(expectedSchema === table("jsonTable").schema) + } + + test("CTAS") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + assert(table("ctasJsonTable").schema === table("jsonTable").schema) + + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM jsonTable").collect()) + } + + test("CTAS with IF NOT EXISTS") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + // Create the table again should trigger a AlreadyExistsException. + val message = intercept[RuntimeException] { + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + }.getMessage + assert(message.contains("Table ctasJsonTable already exists."), + "We should complain that ctasJsonTable already exists") + + // The following statement should be fine if it has IF NOT EXISTS. + // It tries to create a table ctasJsonTable with a new schema. + // The actual table's schema and data should not be changed. + sql( + s""" + |CREATE TABLE IF NOT EXISTS ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT a FROM jsonTable + """.stripMargin) + + // Discard the cached relation. + invalidateTable("ctasJsonTable") + + // Schema should not be changed. + assert(table("ctasJsonTable").schema === table("jsonTable").schema) + // Table data should not be changed. + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM jsonTable").collect()) + } + + test("CTAS a managed table") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + new Path("/Users/yhuai/Desktop/whatever") + + + val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") + val filesystemPath = new Path(expectedPath) + val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) + if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) + + // It is a managed table when we do not specify the location. + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.") + + sql( + s""" + |CREATE TABLE loadedTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${expectedPath}' + |) + """.stripMargin) + + assert(table("ctasJsonTable").schema === table("loadedTable").schema) + + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM loadedTable").collect() + ) + + sql("DROP TABLE ctasJsonTable") + assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.") } test("SPARK-5286 Fail to drop an invalid table when using the data source API") { @@ -255,4 +401,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("DROP TABLE jsonTable").collect().foreach(println) } + + test("save and load table") { + val originalDefaultSource = conf.defaultDataSourceName + conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json") + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val df = jsonRDD(rdd) + + df.saveAsTable("savedJsonTable") + + checkAnswer( + sql("SELECT * FROM savedJsonTable"), + df.collect()) + + createTable("createdJsonTable", catalog.hiveDefaultTableFilePath("savedJsonTable"), false) + assert(table("createdJsonTable").schema === df.schema) + checkAnswer( + sql("SELECT * FROM createdJsonTable"), + df.collect()) + + val message = intercept[RuntimeException] { + createTable("createdJsonTable", filePath.toString, false) + }.getMessage + assert(message.contains("Table createdJsonTable already exists."), + "We should complain that ctasJsonTable already exists") + + createTable("createdJsonTable", filePath.toString, true) + // createdJsonTable should be not changed. + assert(table("createdJsonTable").schema === df.schema) + checkAnswer( + sql("SELECT * FROM createdJsonTable"), + df.collect()) + + conf.setConf("spark.sql.default.datasource", originalDefaultSource) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index eb7a7750af..4efe0c5e0c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -170,7 +170,7 @@ class SQLQuerySuite extends QueryTest { sql("CREATE TABLE test2 (key INT, value STRING)") testData.insertInto("test2") testData.insertInto("test2") - sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test") + sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") checkAnswer( table("test"), sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) |