aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-10 17:29:52 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-10 17:29:52 -0800
commitaaf50d05c7616e4f8f16654b642500ae06cdd774 (patch)
tree7f30e0d08e4f2b531ac62c82a4361a2db577932d /sql/core
parented167e70c6d355f39b366ea0d3b92dd26d826a0b (diff)
downloadspark-aaf50d05c7616e4f8f16654b642500ae06cdd774.tar.gz
spark-aaf50d05c7616e4f8f16654b642500ae06cdd774.tar.bz2
spark-aaf50d05c7616e4f8f16654b642500ae06cdd774.zip
[SPARK-5658][SQL] Finalize DDL and write support APIs
https://issues.apache.org/jira/browse/SPARK-5658 Author: Yin Huai <yhuai@databricks.com> This patch had conflicts when merged, resolved by Committer: Michael Armbrust <michael@databricks.com> Closes #4446 from yhuai/writeSupportFollowup and squashes the following commits: f3a96f7 [Yin Huai] davies's comments. 225ff71 [Yin Huai] Use Scala TestHiveContext to initialize the Python HiveContext in Python tests. 2306f93 [Yin Huai] Style. 2091fcd [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 537e28f [Yin Huai] Correctly clean up temp data. ae4649e [Yin Huai] Fix Python test. 609129c [Yin Huai] Doc format. 92b6659 [Yin Huai] Python doc and other minor updates. cbc717f [Yin Huai] Rename dataSourceName to source. d1c12d3 [Yin Huai] No need to delete the duplicate rule since it has been removed in master. 22cfa70 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup d91ecb8 [Yin Huai] Fix test. 4c76d78 [Yin Huai] Simplify APIs. 3abc215 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 0832ce4 [Yin Huai] Fix test. 98e7cdb [Yin Huai] Python style. 2bf44ef [Yin Huai] Python APIs. c204967 [Yin Huai] Format a10223d [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 9ff97d8 [Yin Huai] Add SaveMode to saveAsTable. 9b6e570 [Yin Huai] Update doc. c2be775 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 99950a2 [Yin Huai] Use Java enum for SaveMode. 4679665 [Yin Huai] Remove duplicate rule. 77d89dc [Yin Huai] Update doc. e04d908 [Yin Huai] Move import and add (Scala-specific) to scala APIs. cf5703d [Yin Huai] Add checkAnswer to Java tests. 7db95ff [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup 6dfd386 [Yin Huai] Add java test. f2f33ef [Yin Huai] Fix test. e702386 [Yin Huai] Apache header. b1e9b1b [Yin Huai] Format. ed4e1b4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup af9e9b3 [Yin Huai] DDL and write support API followup. 2a6213a [Yin Huai] Update API names. e6a0b77 [Yin Huai] Update test. 43bae01 [Yin Huai] Remove createTable from HiveContext. 5ffc372 [Yin Huai] Add more load APIs to SQLContext. 5390743 [Yin Huai] Add more save APIs to DataFrame.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/sources/SaveMode.java45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala160
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala164
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala19
-rw-r--r--sql/core/src/test/java/org/apache/spark/sql/sources/JavaSaveLoadSuite.java97
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala92
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala59
15 files changed, 700 insertions, 184 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/SaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/SaveMode.java
new file mode 100644
index 0000000000..3109f5716d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/SaveMode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources;
+
+/**
+ * SaveMode is used to specify the expected behavior of saving a DataFrame to a data source.
+ */
+public enum SaveMode {
+ /**
+ * Append mode means that when saving a DataFrame to a data source, if data/table already exists,
+ * contents of the DataFrame are expected to be appended to existing data.
+ */
+ Append,
+ /**
+ * Overwrite mode means that when saving a DataFrame to a data source,
+ * if data/table already exists, existing data is expected to be overwritten by the contents of
+ * the DataFrame.
+ */
+ Overwrite,
+ /**
+ * ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
+ * an exception is expected to be thrown.
+ */
+ ErrorIfExists,
+ /**
+ * Ignore mode means that when saving a DataFrame to a data source, if data already exists,
+ * the save operation is expected to not save the contents of the DataFrame and to not
+ * change the existing data.
+ */
+ Ignore
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 04e0d09947..ca8d552c5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -17,19 +17,19 @@
package org.apache.spark.sql
+import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.sources.SaveMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
-import scala.util.control.NonFatal
-
-
private[sql] object DataFrame {
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
new DataFrameImpl(sqlContext, logicalPlan)
@@ -574,8 +574,64 @@ trait DataFrame extends RDDApi[Row] {
/**
* :: Experimental ::
- * Creates a table from the the contents of this DataFrame. This will fail if the table already
- * exists.
+ * Creates a table from the the contents of this DataFrame.
+ * It will use the default data source configured by spark.sql.sources.default.
+ * This will fail if the table already exists.
+ *
+ * Note that this currently only works with DataFrames that are created from a HiveContext as
+ * there is no notion of a persisted catalog in a standard SQL context. Instead you can write
+ * an RDD out to a parquet file, and then register that file as a table. This "table" can then
+ * be the target of an `insertInto`.
+ */
+ @Experimental
+ def saveAsTable(tableName: String): Unit = {
+ saveAsTable(tableName, SaveMode.ErrorIfExists)
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates a table from the the contents of this DataFrame, using the default data source
+ * configured by spark.sql.sources.default and [[SaveMode.ErrorIfExists]] as the save mode.
+ *
+ * Note that this currently only works with DataFrames that are created from a HiveContext as
+ * there is no notion of a persisted catalog in a standard SQL context. Instead you can write
+ * an RDD out to a parquet file, and then register that file as a table. This "table" can then
+ * be the target of an `insertInto`.
+ */
+ @Experimental
+ def saveAsTable(tableName: String, mode: SaveMode): Unit = {
+ if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
+ // If table already exists and the save mode is Append,
+ // we will just call insertInto to append the contents of this DataFrame.
+ insertInto(tableName, overwrite = false)
+ } else {
+ val dataSourceName = sqlContext.conf.defaultDataSourceName
+ saveAsTable(tableName, dataSourceName, mode)
+ }
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates a table at the given path from the the contents of this DataFrame
+ * based on a given data source and a set of options,
+ * using [[SaveMode.ErrorIfExists]] as the save mode.
+ *
+ * Note that this currently only works with DataFrames that are created from a HiveContext as
+ * there is no notion of a persisted catalog in a standard SQL context. Instead you can write
+ * an RDD out to a parquet file, and then register that file as a table. This "table" can then
+ * be the target of an `insertInto`.
+ */
+ @Experimental
+ def saveAsTable(
+ tableName: String,
+ source: String): Unit = {
+ saveAsTable(tableName, source, SaveMode.ErrorIfExists)
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates a table at the given path from the the contents of this DataFrame
+ * based on a given data source, [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
@@ -583,12 +639,17 @@ trait DataFrame extends RDDApi[Row] {
* be the target of an `insertInto`.
*/
@Experimental
- def saveAsTable(tableName: String): Unit
+ def saveAsTable(
+ tableName: String,
+ source: String,
+ mode: SaveMode): Unit = {
+ saveAsTable(tableName, source, mode, Map.empty[String, String])
+ }
/**
* :: Experimental ::
- * Creates a table from the the contents of this DataFrame based on a given data source and
- * a set of options. This will fail if the table already exists.
+ * Creates a table at the given path from the the contents of this DataFrame
+ * based on a given data source, [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
@@ -598,14 +659,17 @@ trait DataFrame extends RDDApi[Row] {
@Experimental
def saveAsTable(
tableName: String,
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): Unit
+ source: String,
+ mode: SaveMode,
+ options: java.util.Map[String, String]): Unit = {
+ saveAsTable(tableName, source, mode, options.toMap)
+ }
/**
* :: Experimental ::
- * Creates a table from the the contents of this DataFrame based on a given data source and
- * a set of options. This will fail if the table already exists.
+ * (Scala-specific)
+ * Creates a table from the the contents of this DataFrame based on a given data source,
+ * [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
@@ -615,22 +679,76 @@ trait DataFrame extends RDDApi[Row] {
@Experimental
def saveAsTable(
tableName: String,
- dataSourceName: String,
- options: java.util.Map[String, String]): Unit
+ source: String,
+ mode: SaveMode,
+ options: Map[String, String]): Unit
+
+ /**
+ * :: Experimental ::
+ * Saves the contents of this DataFrame to the given path,
+ * using the default data source configured by spark.sql.sources.default and
+ * [[SaveMode.ErrorIfExists]] as the save mode.
+ */
+ @Experimental
+ def save(path: String): Unit = {
+ save(path, SaveMode.ErrorIfExists)
+ }
+
+ /**
+ * :: Experimental ::
+ * Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
+ * using the default data source configured by spark.sql.sources.default.
+ */
+ @Experimental
+ def save(path: String, mode: SaveMode): Unit = {
+ val dataSourceName = sqlContext.conf.defaultDataSourceName
+ save(path, dataSourceName, mode)
+ }
+ /**
+ * :: Experimental ::
+ * Saves the contents of this DataFrame to the given path based on the given data source,
+ * using [[SaveMode.ErrorIfExists]] as the save mode.
+ */
+ @Experimental
+ def save(path: String, source: String): Unit = {
+ save(source, SaveMode.ErrorIfExists, Map("path" -> path))
+ }
+
+ /**
+ * :: Experimental ::
+ * Saves the contents of this DataFrame to the given path based on the given data source and
+ * [[SaveMode]] specified by mode.
+ */
@Experimental
- def save(path: String): Unit
+ def save(path: String, source: String, mode: SaveMode): Unit = {
+ save(source, mode, Map("path" -> path))
+ }
+ /**
+ * :: Experimental ::
+ * Saves the contents of this DataFrame based on the given data source,
+ * [[SaveMode]] specified by mode, and a set of options.
+ */
@Experimental
def save(
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): Unit
+ source: String,
+ mode: SaveMode,
+ options: java.util.Map[String, String]): Unit = {
+ save(source, mode, options.toMap)
+ }
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Saves the contents of this DataFrame based on the given data source,
+ * [[SaveMode]] specified by mode, and a set of options
+ */
@Experimental
def save(
- dataSourceName: String,
- options: java.util.Map[String, String]): Unit
+ source: String,
+ mode: SaveMode,
+ options: Map[String, String]): Unit
/**
* :: Experimental ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 1ee16ad516..11f9334556 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -28,13 +28,14 @@ import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, ResolvedStar, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan}
+import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{NumericType, StructType}
@@ -341,68 +342,34 @@ private[sql] class DataFrameImpl protected[sql](
override def saveAsParquetFile(path: String): Unit = {
if (sqlContext.conf.parquetUseDataSourceApi) {
- save("org.apache.spark.sql.parquet", "path" -> path)
+ save("org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> path))
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
}
- override def saveAsTable(tableName: String): Unit = {
- val dataSourceName = sqlContext.conf.defaultDataSourceName
- val cmd =
- CreateTableUsingAsLogicalPlan(
- tableName,
- dataSourceName,
- temporary = false,
- Map.empty,
- allowExisting = false,
- logicalPlan)
-
- sqlContext.executePlan(cmd).toRdd
- }
-
override def saveAsTable(
tableName: String,
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): Unit = {
+ source: String,
+ mode: SaveMode,
+ options: Map[String, String]): Unit = {
val cmd =
CreateTableUsingAsLogicalPlan(
tableName,
- dataSourceName,
+ source,
temporary = false,
- (option +: options).toMap,
- allowExisting = false,
+ mode,
+ options,
logicalPlan)
sqlContext.executePlan(cmd).toRdd
}
- override def saveAsTable(
- tableName: String,
- dataSourceName: String,
- options: java.util.Map[String, String]): Unit = {
- val opts = options.toSeq
- saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*)
- }
-
- override def save(path: String): Unit = {
- val dataSourceName = sqlContext.conf.defaultDataSourceName
- save(dataSourceName, "path" -> path)
- }
-
- override def save(
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): Unit = {
- ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this)
- }
-
override def save(
- dataSourceName: String,
- options: java.util.Map[String, String]): Unit = {
- val opts = options.toSeq
- save(dataSourceName, opts.head, opts.tail:_*)
+ source: String,
+ mode: SaveMode,
+ options: Map[String, String]): Unit = {
+ ResolvedDataSource(sqlContext, source, mode, options, this)
}
override def insertInto(tableName: String, overwrite: Boolean): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index ce0557b881..494e49c131 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -25,9 +25,9 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedSt
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.sql.sources.SaveMode
import org.apache.spark.sql.types.StructType
-
private[sql] class IncomputableColumn(protected[sql] val expr: Expression) extends Column {
def this(name: String) = this(name match {
@@ -156,29 +156,16 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def saveAsParquetFile(path: String): Unit = err()
- override def saveAsTable(tableName: String): Unit = err()
-
- override def saveAsTable(
- tableName: String,
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): Unit = err()
-
override def saveAsTable(
tableName: String,
- dataSourceName: String,
- options: java.util.Map[String, String]): Unit = err()
-
- override def save(path: String): Unit = err()
-
- override def save(
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): Unit = err()
+ source: String,
+ mode: SaveMode,
+ options: Map[String, String]): Unit = err()
override def save(
- dataSourceName: String,
- options: java.util.Map[String, String]): Unit = err()
+ source: String,
+ mode: SaveMode,
+ options: Map[String, String]): Unit = err()
override def insertInto(tableName: String, overwrite: Boolean): Unit = err()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 180f5e765f..39f6c2f4bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -50,7 +50,7 @@ private[spark] object SQLConf {
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
// This is used to set the default data source
- val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource"
+ val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
// Whether to perform eager analysis on a DataFrame.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.dataframe.eagerAnalysis"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 97e3777f93..801505bceb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -401,27 +401,173 @@ class SQLContext(@transient val sparkContext: SparkContext)
jsonRDD(json.rdd, samplingRatio);
}
+ /**
+ * :: Experimental ::
+ * Returns the dataset stored at path as a DataFrame,
+ * using the default data source configured by spark.sql.sources.default.
+ */
@Experimental
def load(path: String): DataFrame = {
val dataSourceName = conf.defaultDataSourceName
- load(dataSourceName, ("path", path))
+ load(path, dataSourceName)
}
+ /**
+ * :: Experimental ::
+ * Returns the dataset stored at path as a DataFrame,
+ * using the given data source.
+ */
@Experimental
- def load(
- dataSourceName: String,
- option: (String, String),
- options: (String, String)*): DataFrame = {
- val resolved = ResolvedDataSource(this, None, dataSourceName, (option +: options).toMap)
+ def load(path: String, source: String): DataFrame = {
+ load(source, Map("path" -> path))
+ }
+
+ /**
+ * :: Experimental ::
+ * Returns the dataset specified by the given data source and a set of options as a DataFrame.
+ */
+ @Experimental
+ def load(source: String, options: java.util.Map[String, String]): DataFrame = {
+ load(source, options.toMap)
+ }
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Returns the dataset specified by the given data source and a set of options as a DataFrame.
+ */
+ @Experimental
+ def load(source: String, options: Map[String, String]): DataFrame = {
+ val resolved = ResolvedDataSource(this, None, source, options)
DataFrame(this, LogicalRelation(resolved.relation))
}
+ /**
+ * :: Experimental ::
+ * Returns the dataset specified by the given data source and a set of options as a DataFrame,
+ * using the given schema as the schema of the DataFrame.
+ */
@Experimental
def load(
- dataSourceName: String,
+ source: String,
+ schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
- val opts = options.toSeq
- load(dataSourceName, opts.head, opts.tail:_*)
+ load(source, schema, options.toMap)
+ }
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Returns the dataset specified by the given data source and a set of options as a DataFrame,
+ * using the given schema as the schema of the DataFrame.
+ */
+ @Experimental
+ def load(
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): DataFrame = {
+ val resolved = ResolvedDataSource(this, Some(schema), source, options)
+ DataFrame(this, LogicalRelation(resolved.relation))
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path and returns the corresponding DataFrame.
+ * It will use the default data source configured by spark.sql.sources.default.
+ */
+ @Experimental
+ def createExternalTable(tableName: String, path: String): DataFrame = {
+ val dataSourceName = conf.defaultDataSourceName
+ createExternalTable(tableName, path, dataSourceName)
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path based on a data source
+ * and returns the corresponding DataFrame.
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ path: String,
+ source: String): DataFrame = {
+ createExternalTable(tableName, source, Map("path" -> path))
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates an external table from the given path based on a data source and a set of options.
+ * Then, returns the corresponding DataFrame.
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: java.util.Map[String, String]): DataFrame = {
+ createExternalTable(tableName, source, options.toMap)
+ }
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Creates an external table from the given path based on a data source and a set of options.
+ * Then, returns the corresponding DataFrame.
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: Map[String, String]): DataFrame = {
+ val cmd =
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema = None,
+ source,
+ temporary = false,
+ options,
+ allowExisting = false,
+ managedIfNoPath = false)
+ executePlan(cmd).toRdd
+ table(tableName)
+ }
+
+ /**
+ * :: Experimental ::
+ * Create an external table from the given path based on a data source, a schema and
+ * a set of options. Then, returns the corresponding DataFrame.
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: java.util.Map[String, String]): DataFrame = {
+ createExternalTable(tableName, source, schema, options.toMap)
+ }
+
+ /**
+ * :: Experimental ::
+ * (Scala-specific)
+ * Create an external table from the given path based on a data source, a schema and
+ * a set of options. Then, returns the corresponding DataFrame.
+ */
+ @Experimental
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: Map[String, String]): DataFrame = {
+ val cmd =
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema = Some(schema),
+ source,
+ temporary = false,
+ options,
+ allowExisting = false,
+ managedIfNoPath = false)
+ executePlan(cmd).toRdd
+ table(tableName)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index edf8a5be64..e915e0e6a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -309,7 +309,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false) =>
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false, _) =>
ExecutedCommand(
CreateTempTableUsing(
tableName, userSpecifiedSchema, provider, opts)) :: Nil
@@ -318,24 +318,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case c: CreateTableUsing if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")
- case CreateTableUsingAsSelect(tableName, provider, true, opts, false, query) =>
+ case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) =>
val logicalPlan = sqlContext.parseSql(query)
val cmd =
- CreateTempTableUsingAsSelect(tableName, provider, opts, logicalPlan)
+ CreateTempTableUsingAsSelect(tableName, provider, mode, opts, logicalPlan)
ExecutedCommand(cmd) :: Nil
case c: CreateTableUsingAsSelect if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
- case c: CreateTableUsingAsSelect if c.temporary && c.allowExisting =>
- sys.error("allowExisting should be set to false when creating a temporary table.")
- case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, false, query) =>
+ case CreateTableUsingAsLogicalPlan(tableName, provider, true, mode, opts, query) =>
val cmd =
- CreateTempTableUsingAsSelect(tableName, provider, opts, query)
+ CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query)
ExecutedCommand(cmd) :: Nil
case c: CreateTableUsingAsLogicalPlan if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
- case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
- sys.error("allowExisting should be set to false when creating a temporary table.")
case LogicalDescribeCommand(table, isExtended) =>
val resultPlan = self.sqlContext.executePlan(table).executedPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index c4e14c6c92..f828bcdd65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -20,8 +20,7 @@ package org.apache.spark.sql.json
import java.io.IOException
import org.apache.hadoop.fs.Path
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -29,6 +28,10 @@ import org.apache.spark.sql.types.StructType
private[sql] class DefaultSource
extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
+ private def checkPath(parameters: Map[String, String]): String = {
+ parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
+ }
+
/** Returns a new base relation with the parameters. */
override def createRelation(
sqlContext: SQLContext,
@@ -52,15 +55,30 @@ private[sql] class DefaultSource
override def createRelation(
sqlContext: SQLContext,
+ mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
- val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
+ val path = checkPath(parameters)
val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- if (fs.exists(filesystemPath)) {
- sys.error(s"path $path already exists.")
+ val doSave = if (fs.exists(filesystemPath)) {
+ mode match {
+ case SaveMode.Append =>
+ sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
+ case SaveMode.Overwrite =>
+ fs.delete(filesystemPath, true)
+ true
+ case SaveMode.ErrorIfExists =>
+ sys.error(s"path $path already exists.")
+ case SaveMode.Ignore => false
+ }
+ } else {
+ true
+ }
+ if (doSave) {
+ // Only save data when the save mode is not ignore.
+ data.toJSON.saveAsTextFile(path)
}
- data.toJSON.saveAsTextFile(path)
createRelation(sqlContext, parameters, data.schema)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 04804f78f5..aef9c10fbc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -80,18 +80,45 @@ class DefaultSource
override def createRelation(
sqlContext: SQLContext,
+ mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val path = checkPath(parameters)
- ParquetRelation.createEmpty(
- path,
- data.schema.toAttributes,
- false,
- sqlContext.sparkContext.hadoopConfiguration,
- sqlContext)
-
- val relation = createRelation(sqlContext, parameters, data.schema)
- relation.asInstanceOf[ParquetRelation2].insert(data, true)
+ val filesystemPath = new Path(path)
+ val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val doSave = if (fs.exists(filesystemPath)) {
+ mode match {
+ case SaveMode.Append =>
+ sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
+ case SaveMode.Overwrite =>
+ fs.delete(filesystemPath, true)
+ true
+ case SaveMode.ErrorIfExists =>
+ sys.error(s"path $path already exists.")
+ case SaveMode.Ignore => false
+ }
+ } else {
+ true
+ }
+
+ val relation = if (doSave) {
+ // Only save data when the save mode is not ignore.
+ ParquetRelation.createEmpty(
+ path,
+ data.schema.toAttributes,
+ false,
+ sqlContext.sparkContext.hadoopConfiguration,
+ sqlContext)
+
+ val createdRelation = createRelation(sqlContext, parameters, data.schema)
+ createdRelation.asInstanceOf[ParquetRelation2].insert(data, true)
+
+ createdRelation
+ } else {
+ // If the save mode is Ignore, we will just create the relation based on existing data.
+ createRelation(sqlContext, parameters)
+ }
+
relation
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 9f64f76100..6487c14b1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -119,11 +119,20 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
throw new DDLException(
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
+ // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
+ val mode = if (allowExisting.isDefined) {
+ SaveMode.Ignore
+ } else if (temp.isDefined) {
+ SaveMode.Overwrite
+ } else {
+ SaveMode.ErrorIfExists
+ }
+
CreateTableUsingAsSelect(tableName,
provider,
temp.isDefined,
+ mode,
options,
- allowExisting.isDefined,
query.get)
} else {
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
@@ -133,7 +142,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
provider,
temp.isDefined,
options,
- allowExisting.isDefined)
+ allowExisting.isDefined,
+ managedIfNoPath = false)
}
}
)
@@ -264,6 +274,7 @@ object ResolvedDataSource {
def apply(
sqlContext: SQLContext,
provider: String,
+ mode: SaveMode,
options: Map[String, String],
data: DataFrame): ResolvedDataSource = {
val loader = Utils.getContextOrSparkClassLoader
@@ -277,7 +288,7 @@ object ResolvedDataSource {
val relation = clazz.newInstance match {
case dataSource: CreatableRelationProvider =>
- dataSource.createRelation(sqlContext, options, data)
+ dataSource.createRelation(sqlContext, mode, options, data)
case _ =>
sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
}
@@ -307,28 +318,40 @@ private[sql] case class DescribeCommand(
new MetadataBuilder().putString("comment", "comment of the column").build())())
}
+/**
+ * Used to represent the operation of create table using a data source.
+ * @param tableName
+ * @param userSpecifiedSchema
+ * @param provider
+ * @param temporary
+ * @param options
+ * @param allowExisting If it is true, we will do nothing when the table already exists.
+ * If it is false, an exception will be thrown
+ * @param managedIfNoPath
+ */
private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
temporary: Boolean,
options: Map[String, String],
- allowExisting: Boolean) extends Command
+ allowExisting: Boolean,
+ managedIfNoPath: Boolean) extends Command
private[sql] case class CreateTableUsingAsSelect(
tableName: String,
provider: String,
temporary: Boolean,
+ mode: SaveMode,
options: Map[String, String],
- allowExisting: Boolean,
query: String) extends Command
private[sql] case class CreateTableUsingAsLogicalPlan(
tableName: String,
provider: String,
temporary: Boolean,
+ mode: SaveMode,
options: Map[String, String],
- allowExisting: Boolean,
query: LogicalPlan) extends Command
private [sql] case class CreateTempTableUsing(
@@ -348,12 +371,13 @@ private [sql] case class CreateTempTableUsing(
private [sql] case class CreateTempTableUsingAsSelect(
tableName: String,
provider: String,
+ mode: SaveMode,
options: Map[String, String],
query: LogicalPlan) extends RunnableCommand {
def run(sqlContext: SQLContext) = {
val df = DataFrame(sqlContext, query)
- val resolved = ResolvedDataSource(sqlContext, provider, options, df)
+ val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
sqlContext.registerRDDAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
@@ -364,7 +388,7 @@ private [sql] case class CreateTempTableUsingAsSelect(
/**
* Builds a map in which keys are case insensitive
*/
-protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
+protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
with Serializable {
val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 5eecc303ef..37fda7ba6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -79,8 +79,27 @@ trait SchemaRelationProvider {
@DeveloperApi
trait CreatableRelationProvider {
+ /**
+ * Creates a relation with the given parameters based on the contents of the given
+ * DataFrame. The mode specifies the expected behavior of createRelation when
+ * data already exists.
+ * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
+ * Append mode means that when saving a DataFrame to a data source, if data already exists,
+ * contents of the DataFrame are expected to be appended to existing data.
+ * Overwrite mode means that when saving a DataFrame to a data source, if data already exists,
+ * existing data is expected to be overwritten by the contents of the DataFrame.
+ * ErrorIfExists mode means that when saving a DataFrame to a data source,
+ * if data already exists, an exception is expected to be thrown.
+ *
+ * @param sqlContext
+ * @param mode
+ * @param parameters
+ * @param data
+ * @return
+ */
def createRelation(
sqlContext: SQLContext,
+ mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation
}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
new file mode 100644
index 0000000000..852baf0e09
--- /dev/null
+++ b/sql/core/src/test/java/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.test.TestSQLContext$;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.Utils;
+
+public class JavaSaveLoadSuite {
+
+ private transient JavaSparkContext sc;
+ private transient SQLContext sqlContext;
+
+ String originalDefaultSource;
+ File path;
+ DataFrame df;
+
+ private void checkAnswer(DataFrame actual, List<Row> expected) {
+ String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected);
+ if (errorMessage != null) {
+ Assert.fail(errorMessage);
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ sqlContext = TestSQLContext$.MODULE$;
+ sc = new JavaSparkContext(sqlContext.sparkContext());
+
+ originalDefaultSource = sqlContext.conf().defaultDataSourceName();
+ path =
+ Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
+ if (path.exists()) {
+ path.delete();
+ }
+
+ List<String> jsonObjects = new ArrayList<String>(10);
+ for (int i = 0; i < 10; i++) {
+ jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
+ }
+ JavaRDD<String> rdd = sc.parallelize(jsonObjects);
+ df = sqlContext.jsonRDD(rdd);
+ df.registerTempTable("jsonTable");
+ }
+
+ @Test
+ public void saveAndLoad() {
+ Map<String, String> options = new HashMap<String, String>();
+ options.put("path", path.toString());
+ df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);
+
+ DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", options);
+
+ checkAnswer(loadedDF, df.collectAsList());
+ }
+
+ @Test
+ public void saveAndLoadWithSchema() {
+ Map<String, String> options = new HashMap<String, String>();
+ options.put("path", path.toString());
+ df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);
+
+ List<StructField> fields = new ArrayList<>();
+ fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
+ StructType schema = DataTypes.createStructType(fields);
+ DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", schema, options);
+
+ checkAnswer(loadedDF, sqlContext.sql("SELECT b FROM jsonTable").collectAsList());
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index f9ddd2ca5c..dfb6858957 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import java.util.{Locale, TimeZone}
+import scala.collection.JavaConversions._
+
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.columnar.InMemoryRelation
@@ -52,9 +54,51 @@ class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[DataFrame]] to be executed
- * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
+ * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
protected def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Unit = {
+ QueryTest.checkAnswer(rdd, expectedAnswer) match {
+ case Some(errorMessage) => fail(errorMessage)
+ case None =>
+ }
+ }
+
+ protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = {
+ checkAnswer(rdd, Seq(expectedAnswer))
+ }
+
+ def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = {
+ test(sqlString) {
+ checkAnswer(sqlContext.sql(sqlString), expectedAnswer)
+ }
+ }
+
+ /**
+ * Asserts that a given [[DataFrame]] will be executed using the given number of cached results.
+ */
+ def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = {
+ val planWithCaching = query.queryExecution.withCachedData
+ val cachedData = planWithCaching collect {
+ case cached: InMemoryRelation => cached
+ }
+
+ assert(
+ cachedData.size == numCachedTables,
+ s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
+ planWithCaching)
+ }
+}
+
+object QueryTest {
+ /**
+ * Runs the plan and makes sure the answer matches the expected result.
+ * If there was exception during the execution or the contents of the DataFrame does not
+ * match the expected result, an error message will be returned. Otherwise, a [[None]] will
+ * be returned.
+ * @param rdd the [[DataFrame]] to be executed
+ * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+ */
+ def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
// Converts data to types that we can do equality comparison using Scala collections.
@@ -70,18 +114,20 @@ class QueryTest extends PlanTest {
}
val sparkAnswer = try rdd.collect().toSeq catch {
case e: Exception =>
- fail(
+ val errorMessage =
s"""
|Exception thrown while executing query:
|${rdd.queryExecution}
|== Exception ==
|$e
|${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
- """.stripMargin)
+ """.stripMargin
+ return Some(errorMessage)
}
if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
- fail(s"""
+ val errorMessage =
+ s"""
|Results do not match for query:
|${rdd.logicalPlan}
|== Analyzed Plan ==
@@ -90,37 +136,21 @@ class QueryTest extends PlanTest {
|${rdd.queryExecution.executedPlan}
|== Results ==
|${sideBySide(
- s"== Correct Answer - ${expectedAnswer.size} ==" +:
- prepareAnswer(expectedAnswer).map(_.toString),
- s"== Spark Answer - ${sparkAnswer.size} ==" +:
- prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
- """.stripMargin)
+ s"== Correct Answer - ${expectedAnswer.size} ==" +:
+ prepareAnswer(expectedAnswer).map(_.toString),
+ s"== Spark Answer - ${sparkAnswer.size} ==" +:
+ prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+ """.stripMargin
+ return Some(errorMessage)
}
- }
- protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = {
- checkAnswer(rdd, Seq(expectedAnswer))
- }
-
- def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = {
- test(sqlString) {
- checkAnswer(sqlContext.sql(sqlString), expectedAnswer)
- }
+ return None
}
- /**
- * Asserts that a given [[DataFrame]] will be executed using the given number of cached results.
- */
- def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = {
- val planWithCaching = query.queryExecution.withCachedData
- val cachedData = planWithCaching collect {
- case cached: InMemoryRelation => cached
+ def checkAnswer(rdd: DataFrame, expectedAnswer: java.util.List[Row]): String = {
+ checkAnswer(rdd, expectedAnswer.toSeq) match {
+ case Some(errorMessage) => errorMessage
+ case None => null
}
-
- assert(
- cachedData.size == numCachedTables,
- s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
- planWithCaching)
}
-
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index b02389978b..29caed9337 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -77,12 +77,10 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql("SELECT a, b FROM jsonTable"),
sql("SELECT a, b FROM jt").collect())
- dropTempTable("jsonTable")
-
- val message = intercept[RuntimeException]{
+ val message = intercept[DDLException]{
sql(
s"""
- |CREATE TEMPORARY TABLE jsonTable
+ |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${path.toString}'
@@ -91,10 +89,25 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
""".stripMargin)
}.getMessage
assert(
- message.contains(s"path ${path.toString} already exists."),
+ message.contains(s"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause."),
"CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.")
- // Explicitly delete it.
+ // Overwrite the temporary table.
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a * 4 FROM jt
+ """.stripMargin)
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ sql("SELECT a * 4 FROM jt").collect())
+
+ dropTempTable("jsonTable")
+ // Explicitly delete the data.
if (path.exists()) Utils.deleteRecursively(path)
sql(
@@ -104,12 +117,12 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
|OPTIONS (
| path '${path.toString}'
|) AS
- |SELECT a * 4 FROM jt
+ |SELECT b FROM jt
""".stripMargin)
checkAnswer(
sql("SELECT * FROM jsonTable"),
- sql("SELECT a * 4 FROM jt").collect())
+ sql("SELECT b FROM jt").collect())
dropTempTable("jsonTable")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index fe2f76cc39..a510045671 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -21,10 +21,10 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.util.Utils
-
import org.apache.spark.sql.catalyst.util
+import org.apache.spark.sql.{SQLConf, DataFrame}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
@@ -38,42 +38,60 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
override def beforeAll(): Unit = {
originalDefaultSource = conf.defaultDataSourceName
- conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json")
path = util.getTempFilePath("datasource").getCanonicalFile
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
df = jsonRDD(rdd)
+ df.registerTempTable("jsonTable")
}
override def afterAll(): Unit = {
- conf.setConf("spark.sql.default.datasource", originalDefaultSource)
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
}
after {
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
if (path.exists()) Utils.deleteRecursively(path)
}
def checkLoad(): Unit = {
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
checkAnswer(load(path.toString), df.collect())
- checkAnswer(load("org.apache.spark.sql.json", ("path", path.toString)), df.collect())
+
+ // Test if we can pick up the data source name passed in load.
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+ checkAnswer(load(path.toString, "org.apache.spark.sql.json"), df.collect())
+ checkAnswer(load("org.apache.spark.sql.json", Map("path" -> path.toString)), df.collect())
+ val schema = StructType(StructField("b", StringType, true) :: Nil)
+ checkAnswer(
+ load("org.apache.spark.sql.json", schema, Map("path" -> path.toString)),
+ sql("SELECT b FROM jsonTable").collect())
}
- test("save with overwrite and load") {
+ test("save with path and load") {
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
df.save(path.toString)
- checkLoad
+ checkLoad()
+ }
+
+ test("save with path and datasource, and load") {
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+ df.save(path.toString, "org.apache.spark.sql.json")
+ checkLoad()
}
test("save with data source and options, and load") {
- df.save("org.apache.spark.sql.json", ("path", path.toString))
- checkLoad
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+ df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, Map("path" -> path.toString))
+ checkLoad()
}
test("save and save again") {
- df.save(path.toString)
+ df.save(path.toString, "org.apache.spark.sql.json")
- val message = intercept[RuntimeException] {
- df.save(path.toString)
+ var message = intercept[RuntimeException] {
+ df.save(path.toString, "org.apache.spark.sql.json")
}.getMessage
assert(
@@ -82,7 +100,18 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
if (path.exists()) Utils.deleteRecursively(path)
- df.save(path.toString)
- checkLoad
+ df.save(path.toString, "org.apache.spark.sql.json")
+ checkLoad()
+
+ df.save("org.apache.spark.sql.json", SaveMode.Overwrite, Map("path" -> path.toString))
+ checkLoad()
+
+ message = intercept[RuntimeException] {
+ df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
+ }.getMessage
+
+ assert(
+ message.contains("Append mode is not supported"),
+ "We should complain that 'Append mode is not supported' for JSON source.")
}
} \ No newline at end of file