aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-02 23:30:44 -0800
committerReynold Xin <rxin@databricks.com>2015-02-02 23:30:44 -0800
commit13531dd97c08563e53dacdaeaf1102bdd13ef825 (patch)
tree5ee942b4be335cca10381160241e4fd83dbadb42 /sql
parent50a1a874e1d087a6c79835b1936d0009622a97b1 (diff)
downloadspark-13531dd97c08563e53dacdaeaf1102bdd13ef825.tar.gz
spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.tar.bz2
spark-13531dd97c08563e53dacdaeaf1102bdd13ef825.zip
[SPARK-5501][SPARK-5420][SQL] Write support for the data source API
This PR aims to support `INSERT INTO/OVERWRITE TABLE tableName` and `CREATE TABLE tableName AS SELECT` for the data source API (partitioned tables are not supported). In this PR, I am also adding the support of `IF NOT EXISTS` for our ddl parser. The current semantic of `IF NOT EXISTS` is explained as follows. * For a `CREATE TEMPORARY TABLE` statement, it does not `IF NOT EXISTS` for now. * For a `CREATE TABLE` statement (we are creating a metastore table), if there is an existing table having the same name ... * when `IF NOT EXISTS` clause is used, we will do nothing. * when `IF NOT EXISTS` clause is not used, the user will see an exception saying the table already exists. TODOs: - [x] CTAS support - [x] Programmatic APIs - [ ] Python API (another PR) - [x] More unit tests - [ ] Documents (another PR) marmbrus liancheng rxin Author: Yin Huai <yhuai@databricks.com> Closes #4294 from yhuai/writeSupport and squashes the following commits: 3db1539 [Yin Huai] save does not take overwrite. 1c98881 [Yin Huai] Fix test. 142372a [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport 34e1bfb [Yin Huai] Address comments. 1682ca6 [Yin Huai] Better support for CTAS statements. e789d64 [Yin Huai] For the Scala API, let users to use tuples to provide options. 0128065 [Yin Huai] Short hand versions of save and load. 66ebd74 [Yin Huai] Formatting. 9203ec2 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport e5d29f2 [Yin Huai] Programmatic APIs. 1a719a5 [Yin Huai] CREATE TEMPORARY TABLE with IF NOT EXISTS is not allowed for now. 909924f [Yin Huai] Add saveAsTable for the data source API to DataFrame. 95a7c71 [Yin Huai] Fix bug when handling IF NOT EXISTS clause in a CREATE TEMPORARY TABLE statement. d37b19c [Yin Huai] Cheng's comments. fd6758c [Yin Huai] Use BeforeAndAfterAll. 7880891 [Yin Huai] Support CREATE TABLE AS SELECT STATEMENT and the IF NOT EXISTS clause. cb85b05 [Yin Huai] Initial write support. 2f91354 [Yin Huai] Make INSERT OVERWRITE/INTO statements consistent between HiveQL and SqlParser.
Diffstat (limited to 'sql')
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala49
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala58
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala59
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala126
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala147
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala96
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala88
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala67
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala35
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala76
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala185
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
23 files changed, 1141 insertions, 60 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 594a423146..25e639d390 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -147,8 +147,8 @@ class SqlParser extends AbstractSparkSQLParser {
}
protected lazy val insert: Parser[LogicalPlan] =
- INSERT ~> OVERWRITE.? ~ (INTO ~> relation) ~ select ^^ {
- case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o.isDefined)
+ INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ {
+ case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
}
protected lazy val projection: Parser[Expression] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 385e1ec74f..4cbfb6af5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql
-import java.util.{List => JList}
-
import scala.reflect.ClassTag
import org.apache.spark.annotation.{DeveloperApi, Experimental}
@@ -487,6 +485,53 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
/**
* :: Experimental ::
+ * Creates a table from the the contents of this DataFrame based on a given data source and
+ * a set of options. This will fail if the table already exists.
+ *
+ * Note that this currently only works with DataFrames that are created from a HiveContext as
+ * there is no notion of a persisted catalog in a standard SQL context. Instead you can write
+ * an RDD out to a parquet file, and then register that file as a table. This "table" can then
+ * be the target of an `insertInto`.
+ */
+ @Experimental
+ override def saveAsTable(
+ tableName: String,
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): Unit
+
+ /**
+ * :: Experimental ::
+ * Creates a table from the the contents of this DataFrame based on a given data source and
+ * a set of options. This will fail if the table already exists.
+ *
+ * Note that this currently only works with DataFrames that are created from a HiveContext as
+ * there is no notion of a persisted catalog in a standard SQL context. Instead you can write
+ * an RDD out to a parquet file, and then register that file as a table. This "table" can then
+ * be the target of an `insertInto`.
+ */
+ @Experimental
+ override def saveAsTable(
+ tableName: String,
+ dataSourceName: String,
+ options: java.util.Map[String, String]): Unit
+
+ @Experimental
+ override def save(path: String): Unit
+
+ @Experimental
+ override def save(
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): Unit
+
+ @Experimental
+ override def save(
+ dataSourceName: String,
+ options: java.util.Map[String, String]): Unit
+
+ /**
+ * :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
*/
@Experimental
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index f8fcc25569..f84dbf32fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
import org.apache.spark.sql.json.JsonRDD
+import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan}
import org.apache.spark.sql.types.{NumericType, StructType}
import org.apache.spark.util.Utils
@@ -303,8 +304,61 @@ private[sql] class DataFrameImpl protected[sql](
}
override def saveAsTable(tableName: String): Unit = {
- sqlContext.executePlan(
- CreateTableAsSelect(None, tableName, logicalPlan, allowExisting = false)).toRdd
+ val dataSourceName = sqlContext.conf.defaultDataSourceName
+ val cmd =
+ CreateTableUsingAsLogicalPlan(
+ tableName,
+ dataSourceName,
+ temporary = false,
+ Map.empty,
+ allowExisting = false,
+ logicalPlan)
+
+ sqlContext.executePlan(cmd).toRdd
+ }
+
+ override def saveAsTable(
+ tableName: String,
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): Unit = {
+ val cmd =
+ CreateTableUsingAsLogicalPlan(
+ tableName,
+ dataSourceName,
+ temporary = false,
+ (option +: options).toMap,
+ allowExisting = false,
+ logicalPlan)
+
+ sqlContext.executePlan(cmd).toRdd
+ }
+
+ override def saveAsTable(
+ tableName: String,
+ dataSourceName: String,
+ options: java.util.Map[String, String]): Unit = {
+ val opts = options.toSeq
+ saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*)
+ }
+
+ override def save(path: String): Unit = {
+ val dataSourceName = sqlContext.conf.defaultDataSourceName
+ save(dataSourceName, ("path" -> path))
+ }
+
+ override def save(
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): Unit = {
+ ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this)
+ }
+
+ override def save(
+ dataSourceName: String,
+ options: java.util.Map[String, String]): Unit = {
+ val opts = options.toSeq
+ save(dataSourceName, opts.head, opts.tail:_*)
}
override def insertInto(tableName: String, overwrite: Boolean): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 2f8c695d56..9b051de68f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -152,6 +152,28 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def saveAsTable(tableName: String): Unit = err()
+ override def saveAsTable(
+ tableName: String,
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): Unit = err()
+
+ override def saveAsTable(
+ tableName: String,
+ dataSourceName: String,
+ options: java.util.Map[String, String]): Unit = err()
+
+ override def save(path: String): Unit = err()
+
+ override def save(
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): Unit = err()
+
+ override def save(
+ dataSourceName: String,
+ options: java.util.Map[String, String]): Unit = err()
+
override def insertInto(tableName: String, overwrite: Boolean): Unit = err()
override def toJSON: RDD[String] = err()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 243dc99707..561a91d2d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -47,6 +47,9 @@ private[spark] object SQLConf {
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
+ // This is used to set the default data source
+ val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource"
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
@@ -155,6 +158,9 @@ private[sql] class SQLConf extends Serializable {
private[spark] def broadcastTimeout: Int =
getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt
+ private[spark] def defaultDataSourceName: String =
+ getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f4692b3ff5..a741d0031d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -21,6 +21,7 @@ import java.beans.Introspector
import java.util.Properties
import scala.collection.immutable
+import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
@@ -37,7 +38,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.json._
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy}
+import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -335,6 +336,29 @@ class SQLContext(@transient val sparkContext: SparkContext)
applySchema(rowRDD, appliedSchema)
}
+ @Experimental
+ def load(path: String): DataFrame = {
+ val dataSourceName = conf.defaultDataSourceName
+ load(dataSourceName, ("path", path))
+ }
+
+ @Experimental
+ def load(
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): DataFrame = {
+ val resolved = ResolvedDataSource(this, None, dataSourceName, (option +: options).toMap)
+ DataFrame(this, LogicalRelation(resolved.relation))
+ }
+
+ @Experimental
+ def load(
+ dataSourceName: String,
+ options: java.util.Map[String, String]): DataFrame = {
+ val opts = options.toSeq
+ load(dataSourceName, opts.head, opts.tail:_*)
+ }
+
/**
* :: Experimental ::
* Construct an RDD representing the database table accessible via JDBC URL
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
index eb0eb3f325..c4a00cdb20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
@@ -171,6 +171,33 @@ private[sql] trait DataFrameSpecificApi {
def saveAsTable(tableName: String): Unit
@Experimental
+ def saveAsTable(
+ tableName: String,
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): Unit
+
+ @Experimental
+ def saveAsTable(
+ tableName: String,
+ dataSourceName: String,
+ options: java.util.Map[String, String]): Unit
+
+ @Experimental
+ def save(path: String): Unit
+
+ @Experimental
+ def save(
+ dataSourceName: String,
+ option: (String, String),
+ options: (String, String)*): Unit
+
+ @Experimental
+ def save(
+ dataSourceName: String,
+ options: java.util.Map[String, String]): Unit
+
+ @Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit
@Experimental
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0cc9d049c9..ff0609d4b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.types._
-import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
+import org.apache.spark.sql.sources._
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
@@ -314,12 +314,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false) =>
ExecutedCommand(
- CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil
-
- case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+ CreateTempTableUsing(
+ tableName, userSpecifiedSchema, provider, opts)) :: Nil
+ case c: CreateTableUsing if !c.temporary =>
+ sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+ case c: CreateTableUsing if c.temporary && c.allowExisting =>
+ sys.error("allowExisting should be set to false when creating a temporary table.")
+
+ case CreateTableUsingAsSelect(tableName, provider, true, opts, false, query) =>
+ val logicalPlan = sqlContext.parseSql(query)
+ val cmd =
+ CreateTempTableUsingAsSelect(tableName, provider, opts, logicalPlan)
+ ExecutedCommand(cmd) :: Nil
+ case c: CreateTableUsingAsSelect if !c.temporary =>
+ sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+ case c: CreateTableUsingAsSelect if c.temporary && c.allowExisting =>
+ sys.error("allowExisting should be set to false when creating a temporary table.")
+
+ case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, false, query) =>
+ val cmd =
+ CreateTempTableUsingAsSelect(tableName, provider, opts, query)
+ ExecutedCommand(cmd) :: Nil
+ case c: CreateTableUsingAsLogicalPlan if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+ case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
+ sys.error("allowExisting should be set to false when creating a temporary table.")
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 1af96c28d5..8372decbf8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -17,21 +17,26 @@
package org.apache.spark.sql.json
-import org.apache.spark.sql.SQLContext
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
-private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider {
+private[sql] class DefaultSource
+ extends RelationProvider with SchemaRelationProvider with CreateableRelationProvider {
/** Returns a new base relation with the parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
- val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
+ val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
- JSONRelation(fileName, samplingRatio, None)(sqlContext)
+ JSONRelation(path, samplingRatio, None)(sqlContext)
}
/** Returns a new base relation with the given schema and parameters. */
@@ -39,21 +44,37 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
- val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
+ val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
- JSONRelation(fileName, samplingRatio, Some(schema))(sqlContext)
+ JSONRelation(path, samplingRatio, Some(schema))(sqlContext)
+ }
+
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ data: DataFrame): BaseRelation = {
+ val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
+ val filesystemPath = new Path(path)
+ val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ if (fs.exists(filesystemPath)) {
+ sys.error(s"path $path already exists.")
+ }
+ data.toJSON.saveAsTextFile(path)
+
+ createRelation(sqlContext, parameters, data.schema)
}
}
private[sql] case class JSONRelation(
- fileName: String,
+ path: String,
samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
@transient val sqlContext: SQLContext)
- extends TableScan {
+ extends TableScan with InsertableRelation {
- private def baseRDD = sqlContext.sparkContext.textFile(fileName)
+ // TODO: Support partitioned JSON relation.
+ private def baseRDD = sqlContext.sparkContext.textFile(path)
override val schema = userSpecifiedSchema.getOrElse(
JsonRDD.nullTypeToStringType(
@@ -64,4 +85,24 @@ private[sql] case class JSONRelation(
override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)
+
+ override def insert(data: DataFrame, overwrite: Boolean) = {
+ val filesystemPath = new Path(path)
+ val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+ if (overwrite) {
+ try {
+ fs.delete(filesystemPath, true)
+ } catch {
+ case e: IOException =>
+ throw new IOException(
+ s"Unable to clear output directory ${filesystemPath.toString} prior"
+ + s" to INSERT OVERWRITE a JSON table:\n${e.toString}")
+ }
+ data.toJSON.saveAsTextFile(path)
+ } else {
+ // TODO: Support INSERT INTO
+ sys.error("JSON table only support INSERT OVERWRITE for now.")
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index d13f2ce2a5..386ff2452f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{Row, Strategy}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, InsertIntoTable => LogicalInsertIntoTable}
import org.apache.spark.sql.execution
/**
@@ -54,6 +54,13 @@ private[sql] object DataSourceStrategy extends Strategy {
case l @ LogicalRelation(t: TableScan) =>
execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
+ case i @ LogicalInsertIntoTable(
+ l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
+ if (partition.nonEmpty) {
+ sys.error(s"Insert into a partition is not allowed because $l is not partitioned.")
+ }
+ execution.ExecutedCommand(InsertIntoRelation(t, query, overwrite)) :: Nil
+
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
new file mode 100644
index 0000000000..d7942dc309
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.RunnableCommand
+
+private[sql] case class InsertIntoRelation(
+ relation: InsertableRelation,
+ query: LogicalPlan,
+ overwrite: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext) = {
+ relation.insert(DataFrame(sqlContext, query), overwrite)
+
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index b1bbe0f89a..ead827728c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -36,6 +36,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
try {
Some(apply(input))
} catch {
+ case ddlException: DDLException => throw ddlException
case _ if !exceptionOnError => None
case x: Throwable => throw x
}
@@ -45,8 +46,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
lexical.initialize(reservedWords)
phrase(dataType)(new lexical.Scanner(input)) match {
case Success(r, x) => r
- case x =>
- sys.error(s"Unsupported dataType: $x")
+ case x => throw new DDLException(s"Unsupported dataType: $x")
}
}
@@ -56,8 +56,12 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val CREATE = Keyword("CREATE")
protected val TEMPORARY = Keyword("TEMPORARY")
protected val TABLE = Keyword("TABLE")
+ protected val IF = Keyword("IF")
+ protected val NOT = Keyword("NOT")
+ protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
+ protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
// Data types.
@@ -83,22 +87,51 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected def start: Parser[LogicalPlan] = ddl
/**
- * `CREATE [TEMPORARY] TABLE avroTable
+ * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
- * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...)
+ * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+ * or
+ * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
+ * USING org.apache.spark.sql.avro
+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+ * AS SELECT ...
*/
protected lazy val createTable: Parser[LogicalPlan] =
(
- (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident
- ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
- case temp ~ tableName ~ columns ~ provider ~ opts =>
- val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
- CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts)
- }
+ (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident
+ ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ~ (AS ~> restInput).? ^^ {
+ case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
+ if (temp.isDefined && allowExisting.isDefined) {
+ throw new DDLException(
+ "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
+ }
+
+ if (query.isDefined) {
+ if (columns.isDefined) {
+ throw new DDLException(
+ "a CREATE TABLE AS SELECT statement does not allow column definitions.")
+ }
+ CreateTableUsingAsSelect(tableName,
+ provider,
+ temp.isDefined,
+ opts,
+ allowExisting.isDefined,
+ query.get)
+ } else {
+ val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema,
+ provider,
+ temp.isDefined,
+ opts,
+ allowExisting.isDefined)
+ }
+ }
)
protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
@@ -193,7 +226,7 @@ object ResolvedDataSource {
dataSource
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
- case _ =>
+ case dataSource: org.apache.spark.sql.sources.RelationProvider =>
sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.")
}
}
@@ -203,7 +236,7 @@ object ResolvedDataSource {
dataSource
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options))
- case _ =>
+ case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.")
}
}
@@ -211,6 +244,32 @@ object ResolvedDataSource {
new ResolvedDataSource(clazz, relation)
}
+
+ def apply(
+ sqlContext: SQLContext,
+ provider: String,
+ options: Map[String, String],
+ data: DataFrame): ResolvedDataSource = {
+ val loader = Utils.getContextOrSparkClassLoader
+ val clazz: Class[_] = try loader.loadClass(provider) catch {
+ case cnf: java.lang.ClassNotFoundException =>
+ try loader.loadClass(provider + ".DefaultSource") catch {
+ case cnf: java.lang.ClassNotFoundException =>
+ sys.error(s"Failed to load class for data source: $provider")
+ }
+ }
+
+ val relation = clazz.newInstance match {
+ case dataSource: org.apache.spark.sql.sources.CreateableRelationProvider =>
+ dataSource
+ .asInstanceOf[org.apache.spark.sql.sources.CreateableRelationProvider]
+ .createRelation(sqlContext, options, data)
+ case _ =>
+ sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
+ }
+
+ new ResolvedDataSource(clazz, relation)
+ }
}
private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
@@ -220,13 +279,30 @@ private[sql] case class CreateTableUsing(
userSpecifiedSchema: Option[StructType],
provider: String,
temporary: Boolean,
- options: Map[String, String]) extends Command
+ options: Map[String, String],
+ allowExisting: Boolean) extends Command
+
+private[sql] case class CreateTableUsingAsSelect(
+ tableName: String,
+ provider: String,
+ temporary: Boolean,
+ options: Map[String, String],
+ allowExisting: Boolean,
+ query: String) extends Command
+
+private[sql] case class CreateTableUsingAsLogicalPlan(
+ tableName: String,
+ provider: String,
+ temporary: Boolean,
+ options: Map[String, String],
+ allowExisting: Boolean,
+ query: LogicalPlan) extends Command
private [sql] case class CreateTempTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
- options: Map[String, String]) extends RunnableCommand {
+ options: Map[String, String]) extends RunnableCommand {
def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
@@ -236,6 +312,22 @@ private [sql] case class CreateTempTableUsing(
}
}
+private [sql] case class CreateTempTableUsingAsSelect(
+ tableName: String,
+ provider: String,
+ options: Map[String, String],
+ query: LogicalPlan) extends RunnableCommand {
+
+ def run(sqlContext: SQLContext) = {
+ val df = DataFrame(sqlContext, query)
+ val resolved = ResolvedDataSource(sqlContext, provider, options, df)
+ sqlContext.registerRDDAsTable(
+ DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
+
+ Seq.empty
+ }
+}
+
/**
* Builds a map in which keys are case insensitive
*/
@@ -253,3 +345,9 @@ protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String,
override def -(key: String): Map[String, String] = baseMap - key.toLowerCase()
}
+
+/**
+ * The exception thrown from the DDL parser.
+ * @param message
+ */
+protected[sql] class DDLException(message: String) extends Exception(message)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index cd82cc6ecb..ad0a35b91e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.sources
import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
import org.apache.spark.sql.types.StructType
@@ -77,6 +77,14 @@ trait SchemaRelationProvider {
schema: StructType): BaseRelation
}
+@DeveloperApi
+trait CreateableRelationProvider {
+ def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ data: DataFrame): BaseRelation
+}
+
/**
* ::DeveloperApi::
* Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
@@ -108,7 +116,7 @@ abstract class BaseRelation {
* A BaseRelation that can produce all of its tuples as an RDD of Row objects.
*/
@DeveloperApi
-abstract class TableScan extends BaseRelation {
+trait TableScan extends BaseRelation {
def buildScan(): RDD[Row]
}
@@ -118,7 +126,7 @@ abstract class TableScan extends BaseRelation {
* containing all of its tuples as Row objects.
*/
@DeveloperApi
-abstract class PrunedScan extends BaseRelation {
+trait PrunedScan extends BaseRelation {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
@@ -132,7 +140,7 @@ abstract class PrunedScan extends BaseRelation {
* as filtering partitions based on a bloom filter.
*/
@DeveloperApi
-abstract class PrunedFilteredScan extends BaseRelation {
+trait PrunedFilteredScan extends BaseRelation {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
@@ -145,6 +153,11 @@ abstract class PrunedFilteredScan extends BaseRelation {
* for experimentation.
*/
@Experimental
-abstract class CatalystScan extends BaseRelation {
+trait CatalystScan extends BaseRelation {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}
+
+@DeveloperApi
+trait InsertableRelation extends BaseRelation {
+ def insert(data: DataFrame, overwrite: Boolean): Unit
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 3d82f4bce7..5ec7a156d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -37,11 +37,21 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
test("appending") {
val data = (0 until 10).map(i => (i, i.toString))
withParquetTable(data, "t") {
- sql("INSERT INTO t SELECT * FROM t")
+ sql("INSERT INTO TABLE t SELECT * FROM t")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
}
+ // This test case will trigger the NPE mentioned in
+ // https://issues.apache.org/jira/browse/PARQUET-151.
+ ignore("overwriting") {
+ val data = (0 until 10).map(i => (i, i.toString))
+ withParquetTable(data, "t") {
+ sql("INSERT OVERWRITE TABLE t SELECT * FROM t")
+ checkAnswer(table("t"), data.map(Row.fromTuple))
+ }
+ }
+
test("self-join") {
// 4 rows, cells of column 1 of row 2 and row 4 are null
val data = (1 to 4).map { i =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
new file mode 100644
index 0000000000..b02389978b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.util
+import org.apache.spark.util.Utils
+
+class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
+
+ import caseInsensisitiveContext._
+
+ var path: File = null
+
+ override def beforeAll(): Unit = {
+ path = util.getTempFilePath("jsonCTAS").getCanonicalFile
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ jsonRDD(rdd).registerTempTable("jt")
+ }
+
+ override def afterAll(): Unit = {
+ dropTempTable("jt")
+ }
+
+ after {
+ if (path.exists()) Utils.deleteRecursively(path)
+ }
+
+ test("CREATE TEMPORARY TABLE AS SELECT") {
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a, b FROM jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT a, b FROM jsonTable"),
+ sql("SELECT a, b FROM jt").collect())
+
+ dropTempTable("jsonTable")
+ }
+
+ test("create a table, drop it and create another one with the same name") {
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a, b FROM jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT a, b FROM jsonTable"),
+ sql("SELECT a, b FROM jt").collect())
+
+ dropTempTable("jsonTable")
+
+ val message = intercept[RuntimeException]{
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a * 4 FROM jt
+ """.stripMargin)
+ }.getMessage
+ assert(
+ message.contains(s"path ${path.toString} already exists."),
+ "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.")
+
+ // Explicitly delete it.
+ if (path.exists()) Utils.deleteRecursively(path)
+
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a * 4 FROM jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ sql("SELECT a * 4 FROM jt").collect())
+
+ dropTempTable("jsonTable")
+ }
+
+ test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") {
+ val message = intercept[DDLException]{
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT b FROM jt
+ """.stripMargin)
+ }.getMessage
+ assert(
+ message.contains("a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause."),
+ "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.")
+ }
+
+ test("a CTAS statement with column definitions is not allowed") {
+ intercept[DDLException]{
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE jsonTable (a int, b string)
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a, b FROM jt
+ """.stripMargin)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala
new file mode 100644
index 0000000000..f91cea6a37
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.util
+import org.apache.spark.util.Utils
+
+class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll {
+
+ import caseInsensisitiveContext._
+
+ var path: File = null
+
+ override def beforeAll: Unit = {
+ path = util.getTempFilePath("jsonCTAS").getCanonicalFile
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ jsonRDD(rdd).registerTempTable("jt")
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE jsonTable (a int, b string)
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${path.toString}'
+ |)
+ """.stripMargin)
+ }
+
+ override def afterAll: Unit = {
+ dropTempTable("jsonTable")
+ dropTempTable("jt")
+ if (path.exists()) Utils.deleteRecursively(path)
+ }
+
+ test("Simple INSERT OVERWRITE a JSONRelation") {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT a, b FROM jsonTable"),
+ (1 to 10).map(i => Row(i, s"str$i"))
+ )
+ }
+
+ test("INSERT OVERWRITE a JSONRelation multiple times") {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
+ """.stripMargin)
+
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
+ """.stripMargin)
+
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT a, b FROM jsonTable"),
+ (1 to 10).map(i => Row(i, s"str$i"))
+ )
+ }
+
+ test("INSERT INTO not supported for JSONRelation for now") {
+ intercept[RuntimeException]{
+ sql(
+ s"""
+ |INSERT INTO TABLE jsonTable SELECT a, b FROM jt
+ """.stripMargin)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
new file mode 100644
index 0000000000..fe2f76cc39
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.util.Utils
+
+import org.apache.spark.sql.catalyst.util
+
+class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
+
+ import caseInsensisitiveContext._
+
+ var originalDefaultSource: String = null
+
+ var path: File = null
+
+ var df: DataFrame = null
+
+ override def beforeAll(): Unit = {
+ originalDefaultSource = conf.defaultDataSourceName
+ conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json")
+
+ path = util.getTempFilePath("datasource").getCanonicalFile
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ df = jsonRDD(rdd)
+ }
+
+ override def afterAll(): Unit = {
+ conf.setConf("spark.sql.default.datasource", originalDefaultSource)
+ }
+
+ after {
+ if (path.exists()) Utils.deleteRecursively(path)
+ }
+
+ def checkLoad(): Unit = {
+ checkAnswer(load(path.toString), df.collect())
+ checkAnswer(load("org.apache.spark.sql.json", ("path", path.toString)), df.collect())
+ }
+
+ test("save with overwrite and load") {
+ df.save(path.toString)
+ checkLoad
+ }
+
+ test("save with data source and options, and load") {
+ df.save("org.apache.spark.sql.json", ("path", path.toString))
+ checkLoad
+ }
+
+ test("save and save again") {
+ df.save(path.toString)
+
+ val message = intercept[RuntimeException] {
+ df.save(path.toString)
+ }.getMessage
+
+ assert(
+ message.contains("already exists"),
+ "We should complain that the path already exists.")
+
+ if (path.exists()) Utils.deleteRecursively(path)
+
+ df.save(path.toString)
+ checkLoad
+ }
+} \ No newline at end of file
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 5efc3b1e30..f6d9027f90 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
-import org.apache.spark.sql.sources.DataSourceStrategy
+import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy}
import org.apache.spark.sql.types._
/**
@@ -86,6 +86,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* @param allowExisting When false, an exception will be thrown if the table already exists.
* @tparam A A case class that is used to describe the schema of the table to be created.
*/
+ @Deprecated
def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}
@@ -106,6 +107,70 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.invalidateTable("default", tableName)
}
+ @Experimental
+ def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = {
+ val dataSourceName = conf.defaultDataSourceName
+ createTable(tableName, dataSourceName, allowExisting, ("path", path))
+ }
+
+ @Experimental
+ def createTable(
+ tableName: String,
+ dataSourceName: String,
+ allowExisting: Boolean,
+ option: (String, String),
+ options: (String, String)*): Unit = {
+ val cmd =
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema = None,
+ dataSourceName,
+ temporary = false,
+ (option +: options).toMap,
+ allowExisting)
+ executePlan(cmd).toRdd
+ }
+
+ @Experimental
+ def createTable(
+ tableName: String,
+ dataSourceName: String,
+ schema: StructType,
+ allowExisting: Boolean,
+ option: (String, String),
+ options: (String, String)*): Unit = {
+ val cmd =
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema = Some(schema),
+ dataSourceName,
+ temporary = false,
+ (option +: options).toMap,
+ allowExisting)
+ executePlan(cmd).toRdd
+ }
+
+ @Experimental
+ def createTable(
+ tableName: String,
+ dataSourceName: String,
+ allowExisting: Boolean,
+ options: java.util.Map[String, String]): Unit = {
+ val opts = options.toSeq
+ createTable(tableName, dataSourceName, allowExisting, opts.head, opts.tail:_*)
+ }
+
+ @Experimental
+ def createTable(
+ tableName: String,
+ dataSourceName: String,
+ schema: StructType,
+ allowExisting: Boolean,
+ options: java.util.Map[String, String]): Unit = {
+ val opts = options.toSeq
+ createTable(tableName, dataSourceName, schema, allowExisting, opts.head, opts.tail:_*)
+ }
+
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d910ee9509..48bea6c1bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -23,10 +23,9 @@ import java.util.{List => JList}
import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
-import org.apache.hadoop.hive.ql.metadata.InvalidTableException
+import org.apache.hadoop.hive.metastore.{Warehouse, TableType}
+import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, AlreadyExistsException, FieldSchema}
+import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
@@ -52,6 +51,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)
+ protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
+
// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String) {
@@ -99,11 +100,22 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val caseSensitive: Boolean = false
+ /** *
+ * Creates a data source table (a table created with USING clause) in Hive's metastore.
+ * Returns true when the table has been created. Otherwise, false.
+ * @param tableName
+ * @param userSpecifiedSchema
+ * @param provider
+ * @param options
+ * @param isExternal
+ * @return
+ */
def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
- options: Map[String, String]) = {
+ options: Map[String, String],
+ isExternal: Boolean): Unit = {
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
val tbl = new Table(dbName, tblName)
@@ -113,8 +125,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
- tbl.setProperty("EXTERNAL", "TRUE")
- tbl.setTableType(TableType.EXTERNAL_TABLE)
+ if (isExternal) {
+ tbl.setProperty("EXTERNAL", "TRUE")
+ tbl.setTableType(TableType.EXTERNAL_TABLE)
+ } else {
+ tbl.setProperty("EXTERNAL", "FALSE")
+ tbl.setTableType(TableType.MANAGED_TABLE)
+ }
// create the table
synchronized {
@@ -122,6 +139,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
+ def hiveDefaultTableFilePath(tableName: String): String = {
+ hiveWarehouse.getTablePath(client.getDatabaseCurrent, tableName).toString
+ }
+
def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index fa997288a2..d89111094b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.CreateTableUsing
+import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing}
import org.apache.spark.sql.types.StringType
@@ -212,9 +212,21 @@ private[hive] trait HiveStrategies {
object HiveDDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, opts, allowExisting) =>
ExecutedCommand(
- CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil
+ CreateMetastoreDataSource(
+ tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil
+
+ case CreateTableUsingAsSelect(tableName, provider, false, opts, allowExisting, query) =>
+ val logicalPlan = hiveContext.parseSql(query)
+ val cmd =
+ CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, logicalPlan)
+ ExecutedCommand(cmd) :: Nil
+
+ case CreateTableUsingAsLogicalPlan(tableName, provider, false, opts, allowExisting, query) =>
+ val cmd =
+ CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, query)
+ ExecutedCommand(cmd) :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 4814cb7ebf..95dcaccefd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,8 +18,10 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.ResolvedDataSource
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StructType
@@ -102,11 +104,77 @@ case class CreateMetastoreDataSource(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
- options: Map[String, String]) extends RunnableCommand {
+ options: Map[String, String],
+ allowExisting: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext) = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options)
+
+ if (hiveContext.catalog.tableExists(tableName :: Nil)) {
+ if (allowExisting) {
+ return Seq.empty[Row]
+ } else {
+ sys.error(s"Table $tableName already exists.")
+ }
+ }
+
+ var isExternal = true
+ val optionsWithPath =
+ if (!options.contains("path")) {
+ isExternal = false
+ options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName))
+ } else {
+ options
+ }
+
+ hiveContext.catalog.createDataSourceTable(
+ tableName,
+ userSpecifiedSchema,
+ provider,
+ optionsWithPath,
+ isExternal)
+
+ Seq.empty[Row]
+ }
+}
+
+case class CreateMetastoreDataSourceAsSelect(
+ tableName: String,
+ provider: String,
+ options: Map[String, String],
+ allowExisting: Boolean,
+ query: LogicalPlan) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
+
+ if (hiveContext.catalog.tableExists(tableName :: Nil)) {
+ if (allowExisting) {
+ return Seq.empty[Row]
+ } else {
+ sys.error(s"Table $tableName already exists.")
+ }
+ }
+
+ val df = DataFrame(hiveContext, query)
+ var isExternal = true
+ val optionsWithPath =
+ if (!options.contains("path")) {
+ isExternal = false
+ options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName))
+ } else {
+ options
+ }
+
+ // Create the relation based on the data of df.
+ ResolvedDataSource(sqlContext, provider, optionsWithPath, df)
+
+ hiveContext.catalog.createDataSourceTable(
+ tableName,
+ None,
+ provider,
+ optionsWithPath,
+ isExternal)
Seq.empty[Row]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7408c7ffd6..85795acb65 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -22,7 +22,9 @@ import java.io.File
import org.scalatest.BeforeAndAfterEach
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql._
import org.apache.spark.util.Utils
import org.apache.spark.sql.types._
@@ -36,9 +38,11 @@ import org.apache.spark.sql.hive.test.TestHive._
class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
override def afterEach(): Unit = {
reset()
+ if (ctasPath.exists()) Utils.deleteRecursively(ctasPath)
}
val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
+ var ctasPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile
test ("persistent JSON table") {
sql(
@@ -94,7 +98,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructField("<d>", innerStruct, true) ::
StructField("b", StringType, true) :: Nil)
- assert(expectedSchema == table("jsonTable").schema)
+ assert(expectedSchema === table("jsonTable").schema)
jsonFile(filePath).registerTempTable("expectedJsonTable")
@@ -137,6 +141,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
intercept[Exception] {
sql("SELECT * FROM jsonTable").collect()
}
+
+ assert(
+ (new File(filePath)).exists(),
+ "The table with specified path is considered as an external table, " +
+ "its data should not deleted after DROP TABLE.")
}
test("check change without refresh") {
@@ -240,7 +249,144 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
invalidateTable("jsonTable")
val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
- assert(expectedSchema == table("jsonTable").schema)
+ assert(expectedSchema === table("jsonTable").schema)
+ }
+
+ test("CTAS") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TABLE ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${ctasPath}'
+ |) AS
+ |SELECT * FROM jsonTable
+ """.stripMargin)
+
+ assert(table("ctasJsonTable").schema === table("jsonTable").schema)
+
+ checkAnswer(
+ sql("SELECT * FROM ctasJsonTable"),
+ sql("SELECT * FROM jsonTable").collect())
+ }
+
+ test("CTAS with IF NOT EXISTS") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TABLE ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${ctasPath}'
+ |) AS
+ |SELECT * FROM jsonTable
+ """.stripMargin)
+
+ // Create the table again should trigger a AlreadyExistsException.
+ val message = intercept[RuntimeException] {
+ sql(
+ s"""
+ |CREATE TABLE ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${ctasPath}'
+ |) AS
+ |SELECT * FROM jsonTable
+ """.stripMargin)
+ }.getMessage
+ assert(message.contains("Table ctasJsonTable already exists."),
+ "We should complain that ctasJsonTable already exists")
+
+ // The following statement should be fine if it has IF NOT EXISTS.
+ // It tries to create a table ctasJsonTable with a new schema.
+ // The actual table's schema and data should not be changed.
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${ctasPath}'
+ |) AS
+ |SELECT a FROM jsonTable
+ """.stripMargin)
+
+ // Discard the cached relation.
+ invalidateTable("ctasJsonTable")
+
+ // Schema should not be changed.
+ assert(table("ctasJsonTable").schema === table("jsonTable").schema)
+ // Table data should not be changed.
+ checkAnswer(
+ sql("SELECT * FROM ctasJsonTable"),
+ sql("SELECT * FROM jsonTable").collect())
+ }
+
+ test("CTAS a managed table") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ new Path("/Users/yhuai/Desktop/whatever")
+
+
+ val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+ val filesystemPath = new Path(expectedPath)
+ val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+ if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
+
+ // It is a managed table when we do not specify the location.
+ sql(
+ s"""
+ |CREATE TABLE ctasJsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ |
+ |) AS
+ |SELECT * FROM jsonTable
+ """.stripMargin)
+
+ assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.")
+
+ sql(
+ s"""
+ |CREATE TABLE loadedTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${expectedPath}'
+ |)
+ """.stripMargin)
+
+ assert(table("ctasJsonTable").schema === table("loadedTable").schema)
+
+ checkAnswer(
+ sql("SELECT * FROM ctasJsonTable"),
+ sql("SELECT * FROM loadedTable").collect()
+ )
+
+ sql("DROP TABLE ctasJsonTable")
+ assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.")
}
test("SPARK-5286 Fail to drop an invalid table when using the data source API") {
@@ -255,4 +401,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("DROP TABLE jsonTable").collect().foreach(println)
}
+
+ test("save and load table") {
+ val originalDefaultSource = conf.defaultDataSourceName
+ conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json")
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ val df = jsonRDD(rdd)
+
+ df.saveAsTable("savedJsonTable")
+
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable"),
+ df.collect())
+
+ createTable("createdJsonTable", catalog.hiveDefaultTableFilePath("savedJsonTable"), false)
+ assert(table("createdJsonTable").schema === df.schema)
+ checkAnswer(
+ sql("SELECT * FROM createdJsonTable"),
+ df.collect())
+
+ val message = intercept[RuntimeException] {
+ createTable("createdJsonTable", filePath.toString, false)
+ }.getMessage
+ assert(message.contains("Table createdJsonTable already exists."),
+ "We should complain that ctasJsonTable already exists")
+
+ createTable("createdJsonTable", filePath.toString, true)
+ // createdJsonTable should be not changed.
+ assert(table("createdJsonTable").schema === df.schema)
+ checkAnswer(
+ sql("SELECT * FROM createdJsonTable"),
+ df.collect())
+
+ conf.setConf("spark.sql.default.datasource", originalDefaultSource)
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index eb7a7750af..4efe0c5e0c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -170,7 +170,7 @@ class SQLQuerySuite extends QueryTest {
sql("CREATE TABLE test2 (key INT, value STRING)")
testData.insertInto("test2")
testData.insertInto("test2")
- sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test")
+ sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
checkAnswer(
table("test"),
sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)