aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-15 22:00:31 -0700
committerReynold Xin <rxin@databricks.com>2015-05-15 22:00:31 -0700
commit578bfeeff514228f6fd4b07a536815fbb3510f7e (patch)
tree97964df2b0b7ada4f019f2cd9617ba6af1d59f52 /sql/core
parentdeb411335a09b91eb1f75421d77e1c3686719621 (diff)
downloadspark-578bfeeff514228f6fd4b07a536815fbb3510f7e.tar.gz
spark-578bfeeff514228f6fd4b07a536815fbb3510f7e.tar.bz2
spark-578bfeeff514228f6fd4b07a536815fbb3510f7e.zip
[SPARK-7654][SQL] DataFrameReader and DataFrameWriter for input/output API
This patch introduces DataFrameWriter and DataFrameReader. DataFrameReader interface, accessible through SQLContext.read, contains methods that create DataFrames. These methods used to reside in SQLContext. Example usage: ```scala sqlContext.read.json("...") sqlContext.read.parquet("...") ``` DataFrameWriter interface, accessible through DataFrame.write, implements a builder pattern to avoid the proliferation of options in writing DataFrame out. It currently implements: - mode - format (e.g. "parquet", "json") - options (generic options passed down into data sources) - partitionBy (partitioning columns) Example usage: ```scala df.write.mode("append").format("json").partitionBy("date").saveAsTable("myJsonTable") ``` TODO: - [ ] Documentation update - [ ] Move JDBC into reader / writer? - [ ] Deprecate the old interfaces - [ ] Move the generic load interface into reader. - [ ] Update example code and documentation Author: Reynold Xin <rxin@databricks.com> Closes #6175 from rxin/reader-writer and squashes the following commits: b146c95 [Reynold Xin] Deprecation of old APIs. bd8abdf [Reynold Xin] Fixed merge conflict. 26abea2 [Reynold Xin] Added general load methods. 244fbec [Reynold Xin] Added equivalent to example. 4f15d92 [Reynold Xin] Added documentation for partitionBy. 7e91611 [Reynold Xin] [SPARK-7654][SQL] DataFrameReader and DataFrameWriter for input/output API.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala172
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala218
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala198
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala158
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala8
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala50
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala41
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala26
16 files changed, 599 insertions, 339 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 2e20c3d3f4..55ef357a99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1290,22 +1290,32 @@ class DataFrame private[sql](
}
/**
+ * :: Experimental ::
+ * Interface for saving the content of the [[DataFrame]] out into external storage.
+ *
+ * @group output
+ * @since 1.4.0
+ */
+ @Experimental
+ def write: DataFrameWriter = new DataFrameWriter(this)
+
+ /**
* Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
* Files that are written out using this method can be read back in as a [[DataFrame]]
* using the `parquetFile` function in [[SQLContext]].
* @group output
* @since 1.3.0
*/
+ @deprecated("Use write.parquet(path)", "1.4.0")
def saveAsParquetFile(path: String): Unit = {
if (sqlContext.conf.parquetUseDataSourceApi) {
- save("org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> path))
+ write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
}
/**
- * :: Experimental ::
* Creates a table from the the contents of this DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
* This will fail if the table already exists.
@@ -1320,13 +1330,12 @@ class DataFrame private[sql](
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String): Unit = {
- saveAsTable(tableName, SaveMode.ErrorIfExists)
+ write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
}
/**
- * :: Experimental ::
* Creates a table from the the contents of this DataFrame, using the default data source
* configured by spark.sql.sources.default and [[SaveMode.ErrorIfExists]] as the save mode.
*
@@ -1340,20 +1349,18 @@ class DataFrame private[sql](
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, mode: SaveMode): Unit = {
if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
// If table already exists and the save mode is Append,
// we will just call insertInto to append the contents of this DataFrame.
insertInto(tableName, overwrite = false)
} else {
- val dataSourceName = sqlContext.conf.defaultDataSourceName
- saveAsTable(tableName, dataSourceName, mode)
+ write.mode(mode).saveAsTable(tableName)
}
}
/**
- * :: Experimental ::
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source and a set of options,
* using [[SaveMode.ErrorIfExists]] as the save mode.
@@ -1368,9 +1375,9 @@ class DataFrame private[sql](
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.format(source).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, source: String): Unit = {
- saveAsTable(tableName, source, SaveMode.ErrorIfExists)
+ write.format(source).saveAsTable(tableName)
}
/**
@@ -1388,13 +1395,12 @@ class DataFrame private[sql](
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.format(source).mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, source: String, mode: SaveMode): Unit = {
- saveAsTable(tableName, source, mode, Map.empty[String, String])
+ write.format(source).mode(mode).saveAsTable(tableName)
}
/**
- * :: Experimental ::
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, and a set of options.
*
@@ -1408,40 +1414,17 @@ class DataFrame private[sql](
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
+ "1.4.0")
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: java.util.Map[String, String]): Unit = {
- saveAsTable(tableName, source, mode, options.toMap)
- }
-
- /**
- * :: Experimental ::
- * Creates a table at the given path from the the contents of this DataFrame
- * based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of
- * partition columns.
- *
- * Note that this currently only works with DataFrames that are created from a HiveContext as
- * there is no notion of a persisted catalog in a standard SQL context. Instead you can write
- * an RDD out to a parquet file, and then register that file as a table. This "table" can then
- * be the target of an `insertInto`.
- * @group output
- * @since 1.4.0
- */
- @Experimental
- def saveAsTable(
- tableName: String,
- source: String,
- mode: SaveMode,
- options: java.util.Map[String, String],
- partitionColumns: java.util.List[String]): Unit = {
- saveAsTable(tableName, source, mode, options.toMap, partitionColumns)
+ write.format(source).mode(mode).options(options).saveAsTable(tableName)
}
/**
- * :: Experimental ::
* (Scala-specific)
* Creates a table from the the contents of this DataFrame based on a given data source,
* [[SaveMode]] specified by mode, and a set of options.
@@ -1456,167 +1439,88 @@ class DataFrame private[sql](
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
+ "1.4.0")
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: Map[String, String]): Unit = {
- val cmd =
- CreateTableUsingAsSelect(
- tableName,
- source,
- temporary = false,
- Array.empty[String],
- mode,
- options,
- logicalPlan)
-
- sqlContext.executePlan(cmd).toRdd
+ write.format(source).mode(mode).options(options).saveAsTable(tableName)
}
/**
- * :: Experimental ::
- * Creates a table at the given path from the the contents of this DataFrame
- * based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of
- * partition columns.
- *
- * Note that this currently only works with DataFrames that are created from a HiveContext as
- * there is no notion of a persisted catalog in a standard SQL context. Instead you can write
- * an RDD out to a parquet file, and then register that file as a table. This "table" can then
- * be the target of an `insertInto`.
- * @group output
- * @since 1.4.0
- */
- @Experimental
- def saveAsTable(
- tableName: String,
- source: String,
- mode: SaveMode,
- options: Map[String, String],
- partitionColumns: Seq[String]): Unit = {
- sqlContext.executePlan(
- CreateTableUsingAsSelect(
- tableName,
- source,
- temporary = false,
- partitionColumns.toArray,
- mode,
- options,
- logicalPlan)).toRdd
- }
-
- /**
- * :: Experimental ::
* Saves the contents of this DataFrame to the given path,
* using the default data source configured by spark.sql.sources.default and
* [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.save(path)", "1.4.0")
def save(path: String): Unit = {
- save(path, SaveMode.ErrorIfExists)
+ write.save(path)
}
/**
- * :: Experimental ::
* Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
* using the default data source configured by spark.sql.sources.default.
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.mode(mode).save(path)", "1.4.0")
def save(path: String, mode: SaveMode): Unit = {
- val dataSourceName = sqlContext.conf.defaultDataSourceName
- save(path, dataSourceName, mode)
+ write.mode(mode).save(path)
}
/**
- * :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* using [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.format(source).save(path)", "1.4.0")
def save(path: String, source: String): Unit = {
- save(source, SaveMode.ErrorIfExists, Map("path" -> path))
+ write.format(source).save(path)
}
/**
- * :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source and
* [[SaveMode]] specified by mode.
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.format(source).mode(mode).save(path)", "1.4.0")
def save(path: String, source: String, mode: SaveMode): Unit = {
- save(source, mode, Map("path" -> path))
+ write.format(source).mode(mode).save(path)
}
/**
- * :: Experimental ::
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options.
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
def save(
source: String,
mode: SaveMode,
options: java.util.Map[String, String]): Unit = {
- save(source, mode, options.toMap)
+ write.format(source).mode(mode).options(options).save()
}
/**
- * :: Experimental ::
- * Saves the contents of this DataFrame to the given path based on the given data source,
- * [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
- * @group output
- * @since 1.4.0
- */
- @Experimental
- def save(
- source: String,
- mode: SaveMode,
- options: java.util.Map[String, String],
- partitionColumns: java.util.List[String]): Unit = {
- save(source, mode, options.toMap, partitionColumns)
- }
-
- /**
- * :: Experimental ::
* (Scala-specific)
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options
* @group output
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
def save(
source: String,
mode: SaveMode,
options: Map[String, String]): Unit = {
- ResolvedDataSource(sqlContext, source, Array.empty[String], mode, options, this)
- }
-
- /**
- * :: Experimental ::
- * Saves the contents of this DataFrame to the given path based on the given data source,
- * [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
- * @group output
- * @since 1.4.0
- */
- @Experimental
- def save(
- source: String,
- mode: SaveMode,
- options: Map[String, String],
- partitionColumns: Seq[String]): Unit = {
- ResolvedDataSource(sqlContext, source, partitionColumns.toArray, mode, options, this)
+ write.format(source).mode(mode).options(options).save()
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
new file mode 100644
index 0000000000..4d63faad6f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -0,0 +1,218 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * :: Experimental ::
+ * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
+ * key-value stores, etc).
+ *
+ * @since 1.4.0
+ */
+@Experimental
+class DataFrameReader private[sql](sqlContext: SQLContext) {
+
+ /**
+ * Specifies the input data source format.
+ *
+ * @since 1.4.0
+ */
+ def format(source: String): DataFrameReader = {
+ this.source = source
+ this
+ }
+
+ /**
+ * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
+ * automatically from data. By specifying the schema here, the underlying data source can
+ * skip the schema inference step, and thus speed up data loading.
+ *
+ * @since 1.4.0
+ */
+ def schema(schema: StructType): DataFrameReader = {
+ this.userSpecifiedSchema = Option(schema)
+ this
+ }
+
+ /**
+ * Adds an input option for the underlying data source.
+ *
+ * @since 1.4.0
+ */
+ def option(key: String, value: String): DataFrameReader = {
+ this.extraOptions += (key -> value)
+ this
+ }
+
+ /**
+ * (Scala-specific) Adds input options for the underlying data source.
+ *
+ * @since 1.4.0
+ */
+ def options(options: scala.collection.Map[String, String]): DataFrameReader = {
+ this.extraOptions ++= options
+ this
+ }
+
+ /**
+ * Adds input options for the underlying data source.
+ *
+ * @since 1.4.0
+ */
+ def options(options: java.util.Map[String, String]): DataFrameReader = {
+ this.options(scala.collection.JavaConversions.mapAsScalaMap(options))
+ this
+ }
+
+ /**
+ * Specifies the input partitioning. If specified, the underlying data source does not need to
+ * discover the data partitioning scheme, and thus can speed up very large inputs.
+ *
+ * @since 1.4.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(colNames: String*): DataFrameReader = {
+ this.partitioningColumns = Option(colNames)
+ this
+ }
+
+ /**
+ * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
+ * a local or distributed file system).
+ *
+ * @since 1.4.0
+ */
+ def load(path: String): DataFrame = {
+ option("path", path).load()
+ }
+
+ /**
+ * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
+ * key-value stores).
+ *
+ * @since 1.4.0
+ */
+ def load(): DataFrame = {
+ val resolved = ResolvedDataSource(
+ sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ partitionColumns = partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+ provider = source,
+ options = extraOptions.toMap)
+ DataFrame(sqlContext, LogicalRelation(resolved.relation))
+ }
+
+ /**
+ * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]].
+ *
+ * This function goes through the input once to determine the input schema. If you know the
+ * schema in advance, use the version that specifies the schema to avoid the extra scan.
+ *
+ * @param path input path
+ * @since 1.4.0
+ */
+ def json(path: String): DataFrame = format("json").load(path)
+
+ /**
+ * Loads an `JavaRDD[String]` storing JSON objects (one object per record) and
+ * returns the result as a [[DataFrame]].
+ *
+ * Unless the schema is specified using [[schema]] function, this function goes through the
+ * input once to determine the input schema.
+ *
+ * @param jsonRDD input RDD with one JSON object per record
+ * @since 1.4.0
+ */
+ def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
+
+ /**
+ * Loads an `RDD[String]` storing JSON objects (one object per record) and
+ * returns the result as a [[DataFrame]].
+ *
+ * Unless the schema is specified using [[schema]] function, this function goes through the
+ * input once to determine the input schema.
+ *
+ * @param jsonRDD input RDD with one JSON object per record
+ * @since 1.4.0
+ */
+ def json(jsonRDD: RDD[String]): DataFrame = {
+ val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
+ if (sqlContext.conf.useJacksonStreamingAPI) {
+ sqlContext.baseRelationToDataFrame(
+ new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
+ } else {
+ val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
+ val appliedSchema = userSpecifiedSchema.getOrElse(
+ JsonRDD.nullTypeToStringType(
+ JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord)))
+ val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord)
+ sqlContext.createDataFrame(rowRDD, appliedSchema, needsConversion = false)
+ }
+ }
+
+ /**
+ * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
+ * [[DataFrame]] if no paths are passed in.
+ *
+ * @since 1.4.0
+ */
+ @scala.annotation.varargs
+ def parquet(paths: String*): DataFrame = {
+ if (paths.isEmpty) {
+ sqlContext.emptyDataFrame
+ } else {
+ val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
+ sqlContext.baseRelationToDataFrame(
+ new ParquetRelation2(
+ globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))
+ }
+ }
+
+ /**
+ * Returns the specified table as a [[DataFrame]].
+ *
+ * @since 1.4.0
+ */
+ def table(tableName: String): DataFrame = {
+ DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName)))
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // Builder pattern config options
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ private var source: String = sqlContext.conf.defaultDataSourceName
+
+ private var userSpecifiedSchema: Option[StructType] = None
+
+ private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+
+ private var partitioningColumns: Option[Seq[String]] = None
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
new file mode 100644
index 0000000000..b1fc18ac3c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -0,0 +1,198 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
+
+
+/**
+ * :: Experimental ::
+ * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
+ * key-value stores, etc).
+ *
+ * @since 1.4.0
+ */
+@Experimental
+final class DataFrameWriter private[sql](df: DataFrame) {
+
+ /**
+ * Specifies the behavior when data or table already exists. Options include:
+ * - `SaveMode.Overwrite`: overwrite the existing data.
+ * - `SaveMode.Append`: append the data.
+ * - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
+ * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
+ *
+ * @since 1.4.0
+ */
+ def mode(saveMode: SaveMode): DataFrameWriter = {
+ this.mode = saveMode
+ this
+ }
+
+ /**
+ * Specifies the behavior when data or table already exists. Options include:
+ * - `overwrite`: overwrite the existing data.
+ * - `append`: append the data.
+ * - `ignore`: ignore the operation (i.e. no-op).
+ * - `error`: default option, throw an exception at runtime.
+ *
+ * @since 1.4.0
+ */
+ def mode(saveMode: String): DataFrameWriter = {
+ saveMode.toLowerCase match {
+ case "overwrite" => SaveMode.Overwrite
+ case "append" => SaveMode.Append
+ case "ignore" => SaveMode.Ignore
+ case "error" | "default" => SaveMode.ErrorIfExists
+ case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
+ "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
+ }
+ this
+ }
+
+ /**
+ * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
+ *
+ * @since 1.4.0
+ */
+ def format(source: String): DataFrameWriter = {
+ this.source = source
+ this
+ }
+
+ /**
+ * Adds an output option for the underlying data source.
+ *
+ * @since 1.4.0
+ */
+ def option(key: String, value: String): DataFrameWriter = {
+ this.extraOptions += (key -> value)
+ this
+ }
+
+ /**
+ * (Scala-specific) Adds output options for the underlying data source.
+ *
+ * @since 1.4.0
+ */
+ def options(options: scala.collection.Map[String, String]): DataFrameWriter = {
+ this.extraOptions ++= options
+ this
+ }
+
+ /**
+ * Adds output options for the underlying data source.
+ *
+ * @since 1.4.0
+ */
+ def options(options: java.util.Map[String, String]): DataFrameWriter = {
+ this.options(scala.collection.JavaConversions.mapAsScalaMap(options))
+ this
+ }
+
+ /**
+ * Partitions the output by the given columns on the file system. If specified, the output is
+ * laid out on the file system similar to Hive's partitioning scheme.
+ *
+ * @since 1.4.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(colNames: String*): DataFrameWriter = {
+ this.partitioningColumns = Option(colNames)
+ this
+ }
+
+ /**
+ * Saves the content of the [[DataFrame]] at the specified path.
+ *
+ * @since 1.4.0
+ */
+ def save(path: String): Unit = {
+ this.extraOptions += ("path" -> path)
+ save()
+ }
+
+ /**
+ * Saves the content of the [[DataFrame]] as the specified table.
+ *
+ * @since 1.4.0
+ */
+ def save(): Unit = {
+ ResolvedDataSource(
+ df.sqlContext,
+ source,
+ partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+ mode,
+ extraOptions.toMap,
+ df)
+ }
+
+ /**
+ * Saves the content of the [[DataFrame]] as the specified table.
+ *
+ * @since 1.4.0
+ */
+ def saveAsTable(tableName: String): Unit = {
+ val cmd =
+ CreateTableUsingAsSelect(
+ tableName,
+ source,
+ temporary = false,
+ partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+ mode,
+ extraOptions.toMap,
+ df.logicalPlan)
+ df.sqlContext.executePlan(cmd).toRdd
+ }
+
+ /**
+ * Saves the content of the [[DataFrame]] in JSON format at the specified path.
+ * This is equivalent to:
+ * {{{
+ * format("json").save(path)
+ * }}}
+ *
+ * @since 1.4.0
+ */
+ def json(path: String): Unit = format("json").save(path)
+
+ /**
+ * Saves the content of the [[DataFrame]] in Parquet format at the specified path.
+ * This is equivalent to:
+ * {{{
+ * format("parquet").save(path)
+ * }}}
+ *
+ * @since 1.4.0
+ */
+ def parquet(path: String): Unit = format("parquet").save(path)
+
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // Builder pattern config options
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ private var source: String = df.sqlContext.conf.defaultDataSourceName
+
+ private var mode: SaveMode = SaveMode.ErrorIfExists
+
+ private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+
+ private var partitioningColumns: Option[Seq[String]] = None
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9fb355eb81..34a50e522c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -27,11 +27,9 @@ import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
import com.google.common.reflect.TypeToken
-import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
@@ -43,8 +41,6 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.json._
-import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -597,21 +593,33 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
+ * :: Experimental ::
+ * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
+ * {{{
+ * sqlContext.read.parquet("/path/to/file.parquet")
+ * sqlContext.read.schema(schema).json("/path/to/file.json")
+ * }}}
+ *
+ * @group genericdata
+ * @since 1.4.0
+ */
+ @Experimental
+ def read: DataFrameReader = new DataFrameReader(this)
+
+ /**
* Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
* [[DataFrame]] if no paths are passed in.
*
* @group specificdata
* @since 1.3.0
*/
+ @deprecated("Use read.parquet()", "1.4.0")
@scala.annotation.varargs
def parquetFile(paths: String*): DataFrame = {
if (paths.isEmpty) {
emptyDataFrame
} else if (conf.parquetUseDataSourceApi) {
- val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
- baseRelationToDataFrame(
- new ParquetRelation2(
- globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
+ read.parquet(paths : _*)
} else {
DataFrame(this, parquet.ParquetRelation(
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
@@ -625,28 +633,31 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group specificdata
* @since 1.3.0
*/
- def jsonFile(path: String): DataFrame = jsonFile(path, 1.0)
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonFile(path: String): DataFrame = {
+ read.json(path)
+ }
/**
- * :: Experimental ::
* Loads a JSON file (one object per line) and applies the given schema,
* returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
- @Experimental
- def jsonFile(path: String, schema: StructType): DataFrame =
- load("json", schema, Map("path" -> path))
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonFile(path: String, schema: StructType): DataFrame = {
+ read.schema(schema).json(path)
+ }
/**
- * :: Experimental ::
* @group specificdata
* @since 1.3.0
*/
- @Experimental
- def jsonFile(path: String, samplingRatio: Double): DataFrame =
- load("json", Map("path" -> path, "samplingRatio" -> samplingRatio.toString))
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonFile(path: String, samplingRatio: Double): DataFrame = {
+ read.option("samplingRatio", samplingRatio.toString).json(path)
+ }
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
@@ -656,8 +667,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group specificdata
* @since 1.3.0
*/
- def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
-
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
@@ -667,196 +678,131 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group specificdata
* @since 1.3.0
*/
- def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0)
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
/**
- * :: Experimental ::
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
* returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
- if (conf.useJacksonStreamingAPI) {
- baseRelationToDataFrame(new JSONRelation(() => json, None, 1.0, Some(schema))(this))
- } else {
- val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
- val appliedSchema =
- Option(schema).getOrElse(
- JsonRDD.nullTypeToStringType(
- JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
- val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
- createDataFrame(rowRDD, appliedSchema, needsConversion = false)
- }
+ read.schema(schema).json(json)
}
/**
- * :: Experimental ::
* Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
- jsonRDD(json.rdd, schema)
+ read.schema(schema).json(json)
}
/**
- * :: Experimental ::
* Loads an RDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
- if (conf.useJacksonStreamingAPI) {
- baseRelationToDataFrame(new JSONRelation(() => json, None, samplingRatio, None)(this))
- } else {
- val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
- val appliedSchema =
- JsonRDD.nullTypeToStringType(
- JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
- val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
- createDataFrame(rowRDD, appliedSchema, needsConversion = false)
- }
+ read.option("samplingRatio", samplingRatio.toString).json(json)
}
/**
- * :: Experimental ::
* Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
- jsonRDD(json.rdd, samplingRatio);
+ read.option("samplingRatio", samplingRatio.toString).json(json)
}
/**
- * :: Experimental ::
* Returns the dataset stored at path as a DataFrame,
* using the default data source configured by spark.sql.sources.default.
*
* @group genericdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.load(path)", "1.4.0")
def load(path: String): DataFrame = {
- val dataSourceName = conf.defaultDataSourceName
- load(path, dataSourceName)
+ read.load(path)
}
/**
- * :: Experimental ::
* Returns the dataset stored at path as a DataFrame, using the given data source.
*
* @group genericdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.format(source).load(path)", "1.4.0")
def load(path: String, source: String): DataFrame = {
- load(source, Map("path" -> path))
+ read.format(source).load(path)
}
/**
- * :: Experimental ::
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.format(source).options(options).load()", "1.4.0")
def load(source: String, options: java.util.Map[String, String]): DataFrame = {
- load(source, options.toMap)
+ read.options(options).format(source).load()
}
/**
- * :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.format(source).options(options).load()", "1.4.0")
def load(source: String, options: Map[String, String]): DataFrame = {
- val resolved = ResolvedDataSource(this, None, Array.empty[String], source, options)
- DataFrame(this, LogicalRelation(resolved.relation))
- }
-
- /**
- * :: Experimental ::
- * (Java-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
- *
- * @group genericdata
- * @since 1.3.0
- */
- @Experimental
- def load(
- source: String,
- schema: StructType,
- options: java.util.Map[String, String]): DataFrame = {
- load(source, schema, options.toMap)
+ read.options(options).format(source).load()
}
/**
- * :: Experimental ::
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
def load(
source: String,
schema: StructType,
- partitionColumns: Array[String],
options: java.util.Map[String, String]): DataFrame = {
- load(source, schema, partitionColumns, options.toMap)
- }
-
- /**
- * :: Experimental ::
- * (Scala-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
- * @group genericdata
- * @since 1.3.0
- */
- @Experimental
- def load(
- source: String,
- schema: StructType,
- options: Map[String, String]): DataFrame = {
- val resolved = ResolvedDataSource(this, Some(schema), Array.empty[String], source, options)
- DataFrame(this, LogicalRelation(resolved.relation))
+ read.format(source).schema(schema).options(options).load()
}
/**
- * :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
* @group genericdata
* @since 1.3.0
*/
- @Experimental
+ @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
def load(
source: String,
schema: StructType,
- partitionColumns: Array[String],
options: Map[String, String]): DataFrame = {
- val resolved = ResolvedDataSource(this, Some(schema), partitionColumns, source, options)
- DataFrame(this, LogicalRelation(resolved.relation))
+ read.format(source).schema(schema).options(options).load()
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index 9d17516e0e..7a73b6f1ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -90,7 +90,7 @@ private[sql] trait ParquetTest {
(data: Seq[T])
(f: String => Unit): Unit = {
withTempPath { file =>
- sparkContext.parallelize(data).toDF().saveAsParquetFile(file.getCanonicalPath)
+ sparkContext.parallelize(data).toDF().write.parquet(file.getCanonicalPath)
f(file.getCanonicalPath)
}
}
@@ -102,7 +102,7 @@ private[sql] trait ParquetTest {
protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: DataFrame => Unit): Unit = {
- withParquetFile(data)(path => f(sqlContext.parquetFile(path)))
+ withParquetFile(data)(path => f(sqlContext.read.parquet(path)))
}
/**
@@ -128,12 +128,12 @@ private[sql] trait ParquetTest {
protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = {
- data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
+ data.toDF().write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath)
}
protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
- df.save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
+ df.write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath)
}
protected def makePartitionDir(
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
index b76f7d421f..6a0bcefe7a 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
@@ -75,9 +75,9 @@ public class JavaSaveLoadSuite {
public void saveAndLoad() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
- df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);
+ df.save("json", SaveMode.ErrorIfExists, options);
- DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", options);
+ DataFrame loadedDF = sqlContext.read().format("json").options(options).load();
checkAnswer(loadedDF, df.collectAsList());
}
@@ -86,12 +86,12 @@ public class JavaSaveLoadSuite {
public void saveAndLoadWithSchema() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
- df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);
+ df.save("json", SaveMode.ErrorIfExists, options);
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
- DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", schema, options);
+ DataFrame loadedDF = sqlContext.load("json", schema, options);
checkAnswer(loadedDF, sqlContext.sql("SELECT b FROM jsonTable").collectAsList());
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1d5f6b3aad..054b23dba8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -460,14 +460,14 @@ class DataFrameSuite extends QueryTest {
}
test("SPARK-7551: support backticks for DataFrame attribute resolution") {
- val df = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
+ val df = TestSQLContext.read.json(TestSQLContext.sparkContext.makeRDD(
"""{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
checkAnswer(
df.select(df("`a.b`.c.`d..e`.`f`")),
Row(1)
)
- val df2 = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
+ val df2 = TestSQLContext.read.json(TestSQLContext.sparkContext.makeRDD(
"""{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil))
checkAnswer(
df2.select(df2("`a b`.c.d e.f")),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 479ad9fe62..c5c4f448a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -105,7 +105,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("grouping on nested fields") {
- jsonRDD(sparkContext.parallelize("""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
+ read.json(sparkContext.parallelize("""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
.registerTempTable("rows")
checkAnswer(
@@ -122,7 +122,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("SPARK-6201 IN type conversion") {
- jsonRDD(sparkContext.parallelize(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
+ read.json(
+ sparkContext.parallelize(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
.registerTempTable("d")
checkAnswer(
@@ -1199,7 +1200,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
test("SPARK-3483 Special chars in column names") {
val data = sparkContext.parallelize(
Seq("""{"key?number1": "value1", "key.number2": "value2"}"""))
- jsonRDD(data).registerTempTable("records")
+ read.json(data).registerTempTable("records")
sql("SELECT `key?number1`, `key.number2` FROM records")
}
@@ -1240,11 +1241,11 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("SPARK-4322 Grouping field with struct field as sub expression") {
- jsonRDD(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil)).registerTempTable("data")
+ read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil)).registerTempTable("data")
checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1))
dropTempTable("data")
- jsonRDD(sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data")
+ read.json(sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data")
checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2))
dropTempTable("data")
}
@@ -1292,7 +1293,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("SPARK-6145: ORDER BY test for nested fields") {
- jsonRDD(sparkContext.makeRDD("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
+ read.json(sparkContext.makeRDD("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
.registerTempTable("nestedOrder")
checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1))
@@ -1304,14 +1305,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("SPARK-6145: special cases") {
- jsonRDD(sparkContext.makeRDD(
+ read.json(sparkContext.makeRDD(
"""{"a": {"b": [1]}, "b": [{"a": 1}], "c0": {"a": 1}}""" :: Nil)).registerTempTable("t")
checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY c0.a"), Row(1))
checkAnswer(sql("SELECT b[0].a FROM t ORDER BY c0.a"), Row(1))
}
test("SPARK-6898: complete support for special chars in column names") {
- jsonRDD(sparkContext.makeRDD(
+ read.json(sparkContext.makeRDD(
"""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
.registerTempTable("t")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 2672e20dea..dc2d43a197 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -105,13 +105,13 @@ class UserDefinedTypeSuite extends QueryTest {
test("UDTs with Parquet") {
val tempDir = Utils.createTempDir()
tempDir.delete()
- pointsRDD.saveAsParquetFile(tempDir.getCanonicalPath)
+ pointsRDD.write.parquet(tempDir.getCanonicalPath)
}
test("Repartition UDTs with Parquet") {
val tempDir = Utils.createTempDir()
tempDir.delete()
- pointsRDD.repartition(1).saveAsParquetFile(tempDir.getCanonicalPath)
+ pointsRDD.repartition(1).write.parquet(tempDir.getCanonicalPath)
}
// Tests to make sure that all operators correctly convert types on the way out.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index b06e338598..6f747e5846 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -215,7 +215,7 @@ class JsonSuite extends QueryTest {
}
test("Complex field and type inferring with null in sampling") {
- val jsonDF = jsonRDD(jsonNullStruct)
+ val jsonDF = read.json(jsonNullStruct)
val expectedSchema = StructType(
StructField("headers", StructType(
StructField("Charset", StringType, true) ::
@@ -234,7 +234,7 @@ class JsonSuite extends QueryTest {
}
test("Primitive field and type inferring") {
- val jsonDF = jsonRDD(primitiveFieldAndType)
+ val jsonDF = read.json(primitiveFieldAndType)
val expectedSchema = StructType(
StructField("bigInteger", DecimalType.Unlimited, true) ::
@@ -262,7 +262,7 @@ class JsonSuite extends QueryTest {
}
test("Complex field and type inferring") {
- val jsonDF = jsonRDD(complexFieldAndType1)
+ val jsonDF = read.json(complexFieldAndType1)
val expectedSchema = StructType(
StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) ::
@@ -361,7 +361,7 @@ class JsonSuite extends QueryTest {
}
test("GetField operation on complex data type") {
- val jsonDF = jsonRDD(complexFieldAndType1)
+ val jsonDF = read.json(complexFieldAndType1)
jsonDF.registerTempTable("jsonTable")
checkAnswer(
@@ -377,7 +377,7 @@ class JsonSuite extends QueryTest {
}
test("Type conflict in primitive field values") {
- val jsonDF = jsonRDD(primitiveFieldValueTypeConflict)
+ val jsonDF = read.json(primitiveFieldValueTypeConflict)
val expectedSchema = StructType(
StructField("num_bool", StringType, true) ::
@@ -451,7 +451,7 @@ class JsonSuite extends QueryTest {
}
ignore("Type conflict in primitive field values (Ignored)") {
- val jsonDF = jsonRDD(primitiveFieldValueTypeConflict)
+ val jsonDF = read.json(primitiveFieldValueTypeConflict)
jsonDF.registerTempTable("jsonTable")
// Right now, the analyzer does not promote strings in a boolean expression.
@@ -504,7 +504,7 @@ class JsonSuite extends QueryTest {
}
test("Type conflict in complex field values") {
- val jsonDF = jsonRDD(complexFieldValueTypeConflict)
+ val jsonDF = read.json(complexFieldValueTypeConflict)
val expectedSchema = StructType(
StructField("array", ArrayType(LongType, true), true) ::
@@ -528,7 +528,7 @@ class JsonSuite extends QueryTest {
}
test("Type conflict in array elements") {
- val jsonDF = jsonRDD(arrayElementTypeConflict)
+ val jsonDF = read.json(arrayElementTypeConflict)
val expectedSchema = StructType(
StructField("array1", ArrayType(StringType, true), true) ::
@@ -556,7 +556,7 @@ class JsonSuite extends QueryTest {
}
test("Handling missing fields") {
- val jsonDF = jsonRDD(missingFields)
+ val jsonDF = read.json(missingFields)
val expectedSchema = StructType(
StructField("a", BooleanType, true) ::
@@ -576,7 +576,7 @@ class JsonSuite extends QueryTest {
dir.delete()
val path = dir.getCanonicalPath
sparkContext.parallelize(1 to 100).map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
- val jsonDF = jsonFile(path, 0.49)
+ val jsonDF = read.option("samplingRatio", "0.49").json(path)
val analyzed = jsonDF.queryExecution.analyzed
assert(
@@ -591,7 +591,7 @@ class JsonSuite extends QueryTest {
val schema = StructType(StructField("a", LongType, true) :: Nil)
val logicalRelation =
- jsonFile(path, schema).queryExecution.analyzed.asInstanceOf[LogicalRelation]
+ read.schema(schema).json(path).queryExecution.analyzed.asInstanceOf[LogicalRelation]
val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation]
assert(relationWithSchema.path === Some(path))
assert(relationWithSchema.schema === schema)
@@ -603,7 +603,7 @@ class JsonSuite extends QueryTest {
dir.delete()
val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
- val jsonDF = jsonFile(path)
+ val jsonDF = read.json(path)
val expectedSchema = StructType(
StructField("bigInteger", DecimalType.Unlimited, true) ::
@@ -672,7 +672,7 @@ class JsonSuite extends QueryTest {
StructField("null", StringType, true) ::
StructField("string", StringType, true) :: Nil)
- val jsonDF1 = jsonFile(path, schema)
+ val jsonDF1 = read.schema(schema).json(path)
assert(schema === jsonDF1.schema)
@@ -689,7 +689,7 @@ class JsonSuite extends QueryTest {
"this is a simple string.")
)
- val jsonDF2 = jsonRDD(primitiveFieldAndType, schema)
+ val jsonDF2 = read.schema(schema).json(primitiveFieldAndType)
assert(schema === jsonDF2.schema)
@@ -710,7 +710,7 @@ class JsonSuite extends QueryTest {
test("Applying schemas with MapType") {
val schemaWithSimpleMap = StructType(
StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
- val jsonWithSimpleMap = jsonRDD(mapType1, schemaWithSimpleMap)
+ val jsonWithSimpleMap = read.schema(schemaWithSimpleMap).json(mapType1)
jsonWithSimpleMap.registerTempTable("jsonWithSimpleMap")
@@ -738,7 +738,7 @@ class JsonSuite extends QueryTest {
val schemaWithComplexMap = StructType(
StructField("map", MapType(StringType, innerStruct, true), false) :: Nil)
- val jsonWithComplexMap = jsonRDD(mapType2, schemaWithComplexMap)
+ val jsonWithComplexMap = read.schema(schemaWithComplexMap).json(mapType2)
jsonWithComplexMap.registerTempTable("jsonWithComplexMap")
@@ -764,7 +764,7 @@ class JsonSuite extends QueryTest {
}
test("SPARK-2096 Correctly parse dot notations") {
- val jsonDF = jsonRDD(complexFieldAndType2)
+ val jsonDF = read.json(complexFieldAndType2)
jsonDF.registerTempTable("jsonTable")
checkAnswer(
@@ -782,7 +782,7 @@ class JsonSuite extends QueryTest {
}
test("SPARK-3390 Complex arrays") {
- val jsonDF = jsonRDD(complexFieldAndType2)
+ val jsonDF = read.json(complexFieldAndType2)
jsonDF.registerTempTable("jsonTable")
checkAnswer(
@@ -805,7 +805,7 @@ class JsonSuite extends QueryTest {
}
test("SPARK-3308 Read top level JSON arrays") {
- val jsonDF = jsonRDD(jsonArray)
+ val jsonDF = read.json(jsonArray)
jsonDF.registerTempTable("jsonTable")
checkAnswer(
@@ -826,7 +826,7 @@ class JsonSuite extends QueryTest {
val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord
TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
- val jsonDF = jsonRDD(corruptRecords)
+ val jsonDF = read.json(corruptRecords)
jsonDF.registerTempTable("jsonTable")
val schema = StructType(
@@ -880,7 +880,7 @@ class JsonSuite extends QueryTest {
}
test("SPARK-4068: nulls in arrays") {
- val jsonDF = jsonRDD(nullsInArrays)
+ val jsonDF = read.json(nullsInArrays)
jsonDF.registerTempTable("jsonTable")
val schema = StructType(
@@ -957,8 +957,8 @@ class JsonSuite extends QueryTest {
assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
- val jsonDF = jsonRDD(primitiveFieldAndType)
- val primTable = jsonRDD(jsonDF.toJSON)
+ val jsonDF = read.json(primitiveFieldAndType)
+ val primTable = read.json(jsonDF.toJSON)
primTable.registerTempTable("primativeTable")
checkAnswer(
sql("select * from primativeTable"),
@@ -970,8 +970,8 @@ class JsonSuite extends QueryTest {
"this is a simple string.")
)
- val complexJsonDF = jsonRDD(complexFieldAndType1)
- val compTable = jsonRDD(complexJsonDF.toJSON)
+ val complexJsonDF = read.json(complexFieldAndType1)
+ val compTable = read.json(complexJsonDF.toJSON)
compTable.registerTempTable("complexTable")
// Access elements of a primitive array.
checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 5ad4395847..bdc2ebabc5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -328,12 +328,12 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
- (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
- sqlContext.parquetFile(path).filter("part = 1"),
+ sqlContext.read.parquet(path).filter("part = 1"),
(1 to 3).map(i => Row(i, i.toString, 1)))
}
}
@@ -357,7 +357,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
- (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 008443df21..dd48bb350f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -114,24 +114,24 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
withTempPath { dir =>
val data = makeDecimalRDD(DecimalType(precision, scale))
- data.saveAsParquetFile(dir.getCanonicalPath)
- checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+ data.write.parquet(dir.getCanonicalPath)
+ checkAnswer(read.parquet(dir.getCanonicalPath), data.collect().toSeq)
}
}
// Decimals with precision above 18 are not yet supported
intercept[Throwable] {
withTempPath { dir =>
- makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
- parquetFile(dir.getCanonicalPath).collect()
+ makeDecimalRDD(DecimalType(19, 10)).write.parquet(dir.getCanonicalPath)
+ read.parquet(dir.getCanonicalPath).collect()
}
}
// Unlimited-length decimals are not yet supported
intercept[Throwable] {
withTempPath { dir =>
- makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
- parquetFile(dir.getCanonicalPath).collect()
+ makeDecimalRDD(DecimalType.Unlimited).write.parquet(dir.getCanonicalPath)
+ read.parquet(dir.getCanonicalPath).collect()
}
}
}
@@ -146,8 +146,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
withTempPath { dir =>
val data = makeDateRDD()
- data.saveAsParquetFile(dir.getCanonicalPath)
- checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+ data.write.parquet(dir.getCanonicalPath)
+ checkAnswer(read.parquet(dir.getCanonicalPath), data.collect().toSeq)
}
}
@@ -283,7 +283,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawParquetFile(path)
- checkAnswer(parquetFile(path.toString), (0 until 10).map { i =>
+ checkAnswer(read.parquet(path.toString), (0 until 10).map { i =>
Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
})
}
@@ -311,8 +311,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
test("save - overwrite") {
withParquetFile((1 to 10).map(i => (i, i.toString))) { file =>
val newData = (11 to 20).map(i => (i, i.toString))
- newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Overwrite, Map("path" -> file))
- checkAnswer(parquetFile(file), newData.map(Row.fromTuple))
+ newData.toDF().write.format("parquet").mode(SaveMode.Overwrite).save(file)
+ checkAnswer(read.parquet(file), newData.map(Row.fromTuple))
}
}
@@ -320,8 +320,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val data = (1 to 10).map(i => (i, i.toString))
withParquetFile(data) { file =>
val newData = (11 to 20).map(i => (i, i.toString))
- newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Ignore, Map("path" -> file))
- checkAnswer(parquetFile(file), data.map(Row.fromTuple))
+ newData.toDF().write.format("parquet").mode(SaveMode.Ignore).save(file)
+ checkAnswer(read.parquet(file), data.map(Row.fromTuple))
}
}
@@ -330,8 +330,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
withParquetFile(data) { file =>
val newData = (11 to 20).map(i => (i, i.toString))
val errorMessage = intercept[Throwable] {
- newData.toDF().save(
- "org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> file))
+ newData.toDF().write.format("parquet").mode(SaveMode.ErrorIfExists).save(file)
}.getMessage
assert(errorMessage.contains("already exists"))
}
@@ -341,8 +340,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val data = (1 to 10).map(i => (i, i.toString))
withParquetFile(data) { file =>
val newData = (11 to 20).map(i => (i, i.toString))
- newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Append, Map("path" -> file))
- checkAnswer(parquetFile(file), (data ++ newData).map(Row.fromTuple))
+ newData.toDF().write.format("parquet").mode(SaveMode.Append).save(file)
+ checkAnswer(read.parquet(file), (data ++ newData).map(Row.fromTuple))
}
}
@@ -374,7 +373,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
path,
new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil)
- assertResult(parquetFile(path.toString).schema) {
+ assertResult(read.parquet(path.toString).schema) {
StructType(
StructField("a", BooleanType, nullable = false) ::
StructField("b", IntegerType, nullable = false) ::
@@ -392,7 +391,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
- sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath)
+ sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(configuration)
@@ -421,10 +420,10 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA
// In 1.3.0, save to fs other than file: without configuring core-site.xml would get:
// IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
intercept[Throwable] {
- sqlContext.parquetFile("file:///nonexistent")
+ sqlContext.read.parquet("file:///nonexistent")
}
val errorMessage = intercept[Throwable] {
- sqlContext.parquetFile("hdfs://nonexistent")
+ sqlContext.read.parquet("hdfs://nonexistent")
}.toString
assert(errorMessage.contains("UnknownHostException"))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 138e19766d..8079c46071 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -155,7 +155,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
- parquetFile(base.getCanonicalPath).registerTempTable("t")
+ read.parquet(base.getCanonicalPath).registerTempTable("t")
withTempTable("t") {
checkAnswer(
@@ -202,7 +202,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
- parquetFile(base.getCanonicalPath).registerTempTable("t")
+ read.parquet(base.getCanonicalPath).registerTempTable("t")
withTempTable("t") {
checkAnswer(
@@ -250,10 +250,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
- val parquetRelation = load(
- "org.apache.spark.sql.parquet",
- Map("path" -> base.getCanonicalPath))
-
+ val parquetRelation = read.format("org.apache.spark.sql.parquet").load(base.getCanonicalPath)
parquetRelation.registerTempTable("t")
withTempTable("t") {
@@ -293,10 +290,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
- val parquetRelation = load(
- "org.apache.spark.sql.parquet",
- Map("path" -> base.getCanonicalPath))
-
+ val parquetRelation = read.format("org.apache.spark.sql.parquet").load(base.getCanonicalPath)
parquetRelation.registerTempTable("t")
withTempTable("t") {
@@ -328,7 +322,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
(1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"),
makePartitionDir(base, defaultPartitionName, "pi" -> 2))
- load(base.getCanonicalPath, "org.apache.spark.sql.parquet").registerTempTable("t")
+ read.format("org.apache.spark.sql.parquet").load(base.getCanonicalPath).registerTempTable("t")
withTempTable("t") {
checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 4e54b2eb8d..d2d1011b8e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -33,7 +33,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
override def beforeAll(): Unit = {
path = Utils.createTempDir()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- jsonRDD(rdd).registerTempTable("jt")
+ read.json(rdd).registerTempTable("jt")
}
override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index d1d427e179..6f375ef362 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -33,7 +33,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
override def beforeAll: Unit = {
path = Utils.createTempDir()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- jsonRDD(rdd).registerTempTable("jt")
+ read.json(rdd).registerTempTable("jt")
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable (a int, b string)
@@ -109,7 +109,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
// Writing the table to less part files.
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5)
- jsonRDD(rdd1).registerTempTable("jt1")
+ read.json(rdd1).registerTempTable("jt1")
sql(
s"""
|INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1
@@ -121,7 +121,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
// Writing the table to more part files.
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 10)
- jsonRDD(rdd2).registerTempTable("jt2")
+ read.json(rdd2).registerTempTable("jt2")
sql(
s"""
|INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2
@@ -154,13 +154,13 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
}
test("save directly to the path of a JSON table") {
- table("jt").selectExpr("a * 5 as a", "b").save(path.toString, "json", SaveMode.Overwrite)
+ table("jt").selectExpr("a * 5 as a", "b").write.mode(SaveMode.Overwrite).json(path.toString)
checkAnswer(
sql("SELECT a, b FROM jsonTable"),
(1 to 10).map(i => Row(i * 5, s"str$i"))
)
- table("jt").save(path.toString, "json", SaveMode.Overwrite)
+ table("jt").write.mode(SaveMode.Overwrite).json(path.toString)
checkAnswer(
sql("SELECT a, b FROM jsonTable"),
(1 to 10).map(i => Row(i, s"str$i"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 6567d1acd7..7a28e9af36 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -42,7 +42,7 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
path.delete()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- df = jsonRDD(rdd)
+ df = read.json(rdd)
df.registerTempTable("jsonTable")
}
@@ -57,41 +57,41 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
def checkLoad(): Unit = {
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
- checkAnswer(load(path.toString), df.collect())
+ checkAnswer(read.load(path.toString), df.collect())
// Test if we can pick up the data source name passed in load.
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
- checkAnswer(load(path.toString, "org.apache.spark.sql.json"), df.collect())
- checkAnswer(load("org.apache.spark.sql.json", Map("path" -> path.toString)), df.collect())
+ checkAnswer(read.format("json").load(path.toString), df.collect())
+ checkAnswer(read.format("json").load(path.toString), df.collect())
val schema = StructType(StructField("b", StringType, true) :: Nil)
checkAnswer(
- load("org.apache.spark.sql.json", schema, Map("path" -> path.toString)),
+ read.format("json").schema(schema).load(path.toString),
sql("SELECT b FROM jsonTable").collect())
}
test("save with path and load") {
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
- df.save(path.toString)
+ df.write.save(path.toString)
checkLoad()
}
test("save with path and datasource, and load") {
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
- df.save(path.toString, "org.apache.spark.sql.json")
+ df.write.json(path.toString)
checkLoad()
}
test("save with data source and options, and load") {
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
- df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, Map("path" -> path.toString))
+ df.write.mode(SaveMode.ErrorIfExists).json(path.toString)
checkLoad()
}
test("save and save again") {
- df.save(path.toString, "org.apache.spark.sql.json")
+ df.write.json(path.toString)
var message = intercept[RuntimeException] {
- df.save(path.toString, "org.apache.spark.sql.json")
+ df.write.json(path.toString)
}.getMessage
assert(
@@ -100,14 +100,14 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
if (path.exists()) Utils.deleteRecursively(path)
- df.save(path.toString, "org.apache.spark.sql.json")
+ df.write.json(path.toString)
checkLoad()
- df.save("org.apache.spark.sql.json", SaveMode.Overwrite, Map("path" -> path.toString))
+ df.write.mode(SaveMode.Overwrite).json(path.toString)
checkLoad()
message = intercept[RuntimeException] {
- df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
+ df.write.mode(SaveMode.Append).json(path.toString)
}.getMessage
assert(