aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-16 22:01:53 -0700
committerReynold Xin <rxin@databricks.com>2015-05-16 22:01:53 -0700
commit517eb37a85e0a28820bcfd5d98c50d02df6521c6 (patch)
tree1c33fdee296ea6d88df6a218b649efe8fa10662a /sql
parent3b6ef2c5391b528ef989e24400fbb0c496c3b245 (diff)
downloadspark-517eb37a85e0a28820bcfd5d98c50d02df6521c6.tar.gz
spark-517eb37a85e0a28820bcfd5d98c50d02df6521c6.tar.bz2
spark-517eb37a85e0a28820bcfd5d98c50d02df6521c6.zip
[SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.
Also moved all the deprecated functions into one place for SQLContext and DataFrame, and updated tests to use the new API. Author: Reynold Xin <rxin@databricks.com> Closes #6210 from rxin/df-writer-reader-jdbc and squashes the following commits: 7465c2c [Reynold Xin] Fixed unit test. 118e609 [Reynold Xin] Updated tests. 3441b57 [Reynold Xin] Updated javadoc. 13cdd1c [Reynold Xin] [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala284
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala89
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala53
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala646
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala52
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala6
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java4
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala31
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala54
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala73
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala68
19 files changed, 727 insertions, 741 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 55ef357a99..27e9af49f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql
import java.io.CharArrayWriter
-import java.sql.DriverManager
import java.util.Properties
import scala.collection.JavaConversions._
@@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
-import org.apache.spark.sql.jdbc.JDBCWriteDetails
import org.apache.spark.sql.json.JacksonGenerator
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, ResolvedDataSource}
+import org.apache.spark.sql.sources.CreateTableUsingAsSelect
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -227,10 +225,6 @@ class DataFrame private[sql](
}
}
- /** Left here for backward compatibility. */
- @deprecated("1.3.0", "use toDF")
- def toSchemaRDD: DataFrame = this
-
/**
* Returns the object itself.
* @group basic
@@ -1300,11 +1294,118 @@ class DataFrame private[sql](
def write: DataFrameWriter = new DataFrameWriter(this)
/**
+ * :: Experimental ::
+ * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+ * @group output
+ * @since 1.3.0
+ */
+ @Experimental
+ def insertInto(tableName: String, overwrite: Boolean): Unit = {
+ sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
+ Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
+ }
+
+ /**
+ * :: Experimental ::
+ * Adds the rows from this RDD to the specified table.
+ * Throws an exception if the table already exists.
+ * @group output
+ * @since 1.3.0
+ */
+ @Experimental
+ def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
+
+ /**
+ * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def toJSON: RDD[String] = {
+ val rowSchema = this.schema
+ this.mapPartitions { iter =>
+ val writer = new CharArrayWriter()
+ // create the Generator without separator inserted between 2 records
+ val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+
+ new Iterator[String] {
+ override def hasNext: Boolean = iter.hasNext
+ override def next(): String = {
+ JacksonGenerator(rowSchema, gen)(iter.next())
+ gen.flush()
+
+ val json = writer.toString
+ if (hasNext) {
+ writer.reset()
+ } else {
+ gen.close()
+ }
+
+ json
+ }
+ }
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // for Python API
+ ////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Converts a JavaRDD to a PythonRDD.
+ */
+ protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+ val fieldTypes = schema.fields.map(_.dataType)
+ val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
+ SerDeUtil.javaToPython(jrdd)
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+ // Deprecated methods
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+
+ /** Left here for backward compatibility. */
+ @deprecated("use toDF", "1.3.0")
+ def toSchemaRDD: DataFrame = this
+
+ /**
+ * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
+ * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
+ * If you pass `true` for `allowExisting`, it will drop any table with the
+ * given name; if you pass `false`, it will throw if the table already
+ * exists.
+ * @group output
+ */
+ @deprecated("Use write.jdbc()", "1.4.0")
+ def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
+ val w = if (allowExisting) write.mode(SaveMode.Overwrite) else write
+ w.jdbc(url, table, new Properties)
+ }
+
+ /**
+ * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
+ * Assumes the table already exists and has a compatible schema. If you
+ * pass `true` for `overwrite`, it will `TRUNCATE` the table before
+ * performing the `INSERT`s.
+ *
+ * The table must already exist on the database. It must have a schema
+ * that is compatible with the schema of this RDD; inserting the rows of
+ * the RDD in order via the simple statement
+ * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
+ * @group output
+ */
+ @deprecated("Use write.jdbc()", "1.4.0")
+ def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
+ val w = if (overwrite) write.mode(SaveMode.Overwrite) else write
+ w.jdbc(url, table, new Properties)
+ }
+
+ /**
* Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
* Files that are written out using this method can be read back in as a [[DataFrame]]
* using the `parquetFile` function in [[SQLContext]].
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.parquet(path)", "1.4.0")
def saveAsParquetFile(path: String): Unit = {
@@ -1328,7 +1429,6 @@ class DataFrame private[sql](
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String): Unit = {
@@ -1347,7 +1447,6 @@ class DataFrame private[sql](
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, mode: SaveMode): Unit = {
@@ -1373,7 +1472,6 @@ class DataFrame private[sql](
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.format(source).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, source: String): Unit = {
@@ -1393,7 +1491,6 @@ class DataFrame private[sql](
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, source: String, mode: SaveMode): Unit = {
@@ -1412,7 +1509,6 @@ class DataFrame private[sql](
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
"1.4.0")
@@ -1437,7 +1533,6 @@ class DataFrame private[sql](
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
"1.4.0")
@@ -1454,7 +1549,6 @@ class DataFrame private[sql](
* using the default data source configured by spark.sql.sources.default and
* [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.save(path)", "1.4.0")
def save(path: String): Unit = {
@@ -1465,7 +1559,6 @@ class DataFrame private[sql](
* Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
* using the default data source configured by spark.sql.sources.default.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.mode(mode).save(path)", "1.4.0")
def save(path: String, mode: SaveMode): Unit = {
@@ -1476,7 +1569,6 @@ class DataFrame private[sql](
* Saves the contents of this DataFrame to the given path based on the given data source,
* using [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.format(source).save(path)", "1.4.0")
def save(path: String, source: String): Unit = {
@@ -1487,7 +1579,6 @@ class DataFrame private[sql](
* Saves the contents of this DataFrame to the given path based on the given data source and
* [[SaveMode]] specified by mode.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).save(path)", "1.4.0")
def save(path: String, source: String, mode: SaveMode): Unit = {
@@ -1498,7 +1589,6 @@ class DataFrame private[sql](
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options.
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
def save(
@@ -1513,7 +1603,6 @@ class DataFrame private[sql](
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options
* @group output
- * @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
def save(
@@ -1523,163 +1612,10 @@ class DataFrame private[sql](
write.format(source).mode(mode).options(options).save()
}
- /**
- * :: Experimental ::
- * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
- * @group output
- * @since 1.3.0
- */
- @Experimental
- def insertInto(tableName: String, overwrite: Boolean): Unit = {
- sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
- Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
- }
-
- /**
- * :: Experimental ::
- * Adds the rows from this RDD to the specified table.
- * Throws an exception if the table already exists.
- * @group output
- * @since 1.3.0
- */
- @Experimental
- def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
-
- /**
- * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
- * @group rdd
- * @since 1.3.0
- */
- def toJSON: RDD[String] = {
- val rowSchema = this.schema
- this.mapPartitions { iter =>
- val writer = new CharArrayWriter()
- // create the Generator without separator inserted between 2 records
- val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
-
- new Iterator[String] {
- override def hasNext: Boolean = iter.hasNext
- override def next(): String = {
- JacksonGenerator(rowSchema, gen)(iter.next())
- gen.flush()
-
- val json = writer.toString
- if (hasNext) {
- writer.reset()
- } else {
- gen.close()
- }
-
- json
- }
- }
- }
- }
-
////////////////////////////////////////////////////////////////////////////
- // JDBC Write Support
////////////////////////////////////////////////////////////////////////////
-
- /**
- * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
- * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
- * If you pass `true` for `allowExisting`, it will drop any table with the
- * given name; if you pass `false`, it will throw if the table already
- * exists.
- * @group output
- * @since 1.3.0
- */
- def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
- createJDBCTable(url, table, allowExisting, new Properties())
- }
-
- /**
- * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`
- * using connection properties defined in `properties`.
- * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
- * If you pass `true` for `allowExisting`, it will drop any table with the
- * given name; if you pass `false`, it will throw if the table already
- * exists.
- * @group output
- * @since 1.4.0
- */
- def createJDBCTable(
- url: String,
- table: String,
- allowExisting: Boolean,
- properties: Properties): Unit = {
- val conn = DriverManager.getConnection(url, properties)
- try {
- if (allowExisting) {
- val sql = s"DROP TABLE IF EXISTS $table"
- conn.prepareStatement(sql).executeUpdate()
- }
- val schema = JDBCWriteDetails.schemaString(this, url)
- val sql = s"CREATE TABLE $table ($schema)"
- conn.prepareStatement(sql).executeUpdate()
- } finally {
- conn.close()
- }
- JDBCWriteDetails.saveTable(this, url, table, properties)
- }
-
- /**
- * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
- * Assumes the table already exists and has a compatible schema. If you
- * pass `true` for `overwrite`, it will `TRUNCATE` the table before
- * performing the `INSERT`s.
- *
- * The table must already exist on the database. It must have a schema
- * that is compatible with the schema of this RDD; inserting the rows of
- * the RDD in order via the simple statement
- * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
- * @group output
- * @since 1.3.0
- */
- def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
- insertIntoJDBC(url, table, overwrite, new Properties())
- }
-
- /**
- * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`
- * using connection properties defined in `properties`.
- * Assumes the table already exists and has a compatible schema. If you
- * pass `true` for `overwrite`, it will `TRUNCATE` the table before
- * performing the `INSERT`s.
- *
- * The table must already exist on the database. It must have a schema
- * that is compatible with the schema of this RDD; inserting the rows of
- * the RDD in order via the simple statement
- * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
- * @group output
- * @since 1.4.0
- */
- def insertIntoJDBC(
- url: String,
- table: String,
- overwrite: Boolean,
- properties: Properties): Unit = {
- if (overwrite) {
- val conn = DriverManager.getConnection(url, properties)
- try {
- val sql = s"TRUNCATE TABLE $table"
- conn.prepareStatement(sql).executeUpdate()
- } finally {
- conn.close()
- }
- }
- JDBCWriteDetails.saveTable(this, url, table, properties)
- }
+ // End of eeprecated methods
////////////////////////////////////////////////////////////////////////////
- // for Python API
////////////////////////////////////////////////////////////////////////////
- /**
- * Converts a JavaRDD to a PythonRDD.
- */
- protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
- val fieldTypes = schema.fields.map(_.dataType)
- val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
- SerDeUtil.javaToPython(jrdd)
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 4d63faad6f..381c10f48f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -17,12 +17,16 @@
package org.apache.spark.sql
+import java.util.Properties
+
import org.apache.hadoop.fs.Path
+import org.apache.spark.Partition
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
@@ -31,7 +35,7 @@ import org.apache.spark.sql.types.StructType
/**
* :: Experimental ::
* Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
- * key-value stores, etc).
+ * key-value stores, etc). Use [[SQLContext.read]] to access this.
*
* @since 1.4.0
*/
@@ -94,6 +98,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
* Specifies the input partitioning. If specified, the underlying data source does not need to
* discover the data partitioning scheme, and thus can speed up very large inputs.
*
+ * This is only applicable for Parquet at the moment.
+ *
* @since 1.4.0
*/
@scala.annotation.varargs
@@ -129,6 +135,87 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
}
/**
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+ * url named table and connection properties.
+ *
+ * @since 1.4.0
+ */
+ def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+ jdbc(url, table, JDBCRelation.columnPartition(null), properties)
+ }
+
+ /**
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+ * url named table. Partitions of the table will be retrieved in parallel based on the parameters
+ * passed to this function.
+ *
+ * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+ * your external database systems.
+ *
+ * @param url JDBC database url of the form `jdbc:subprotocol:subname`
+ * @param table Name of the table in the external database.
+ * @param columnName the name of a column of integral type that will be used for partitioning.
+ * @param lowerBound the minimum value of `columnName` used to decide partition stride
+ * @param upperBound the maximum value of `columnName` used to decide partition stride
+ * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
+ * evenly into this many partitions
+ * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
+ * tag/value. Normally at least a "user" and "password" property
+ * should be included.
+ *
+ * @since 1.4.0
+ */
+ def jdbc(
+ url: String,
+ table: String,
+ columnName: String,
+ lowerBound: Long,
+ upperBound: Long,
+ numPartitions: Int,
+ connectionProperties: Properties): DataFrame = {
+ val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
+ val parts = JDBCRelation.columnPartition(partitioning)
+ jdbc(url, table, parts, connectionProperties)
+ }
+
+ /**
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+ * url named table using connection properties. The `predicates` parameter gives a list
+ * expressions suitable for inclusion in WHERE clauses; each one defines one partition
+ * of the [[DataFrame]].
+ *
+ * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+ * your external database systems.
+ *
+ * @param url JDBC database url of the form `jdbc:subprotocol:subname`
+ * @param table Name of the table in the external database.
+ * @param predicates Condition in the where clause for each partition.
+ * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
+ * tag/value. Normally at least a "user" and "password" property
+ * should be included.
+ * @since 1.4.0
+ */
+ def jdbc(
+ url: String,
+ table: String,
+ predicates: Array[String],
+ connectionProperties: Properties): DataFrame = {
+ val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
+ JDBCPartition(part, i) : Partition
+ }
+ jdbc(url, table, parts, connectionProperties)
+ }
+
+ private def jdbc(
+ url: String,
+ table: String,
+ parts: Array[Partition],
+ connectionProperties: Properties): DataFrame = {
+ val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext)
+ sqlContext.baseRelationToDataFrame(relation)
+ }
+
+ /**
* Loads a JSON file (one object per line) and returns the result as a [[DataFrame]].
*
* This function goes through the input once to determine the input schema. If you know the
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9f42f0f1f4..f2e721d4db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -17,14 +17,17 @@
package org.apache.spark.sql
+import java.util.Properties
+
import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
/**
* :: Experimental ::
* Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
- * key-value stores, etc).
+ * key-value stores, etc). Use [[DataFrame.write]] to access this.
*
* @since 1.4.0
*/
@@ -110,6 +113,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* Partitions the output by the given columns on the file system. If specified, the output is
* laid out on the file system similar to Hive's partitioning scheme.
*
+ * This is only applicable for Parquet at the moment.
+ *
* @since 1.4.0
*/
@scala.annotation.varargs
@@ -162,6 +167,52 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
/**
+ * Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the
+ * table already exists in the external database, behavior of this function depends on the
+ * save mode, specified by the `mode` function (default to throwing an exception).
+ *
+ * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+ * your external database systems.
+ *
+ * @param url JDBC database url of the form `jdbc:subprotocol:subname`
+ * @param table Name of the table in the external database.
+ * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
+ * tag/value. Normally at least a "user" and "password" property
+ * should be included.
+ */
+ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
+ val conn = JdbcUtils.createConnection(url, connectionProperties)
+
+ try {
+ var tableExists = JdbcUtils.tableExists(conn, table)
+
+ if (mode == SaveMode.Ignore && tableExists) {
+ return
+ }
+
+ if (mode == SaveMode.ErrorIfExists && tableExists) {
+ sys.error(s"Table $table already exists.")
+ }
+
+ if (mode == SaveMode.Overwrite && tableExists) {
+ JdbcUtils.dropTable(conn, table)
+ tableExists = false
+ }
+
+ // Create the table if the table didn't exist.
+ if (!tableExists) {
+ val schema = JDBCWriteDetails.schemaString(df, url)
+ val sql = s"CREATE TABLE $table ($schema)"
+ conn.prepareStatement(sql).executeUpdate()
+ }
+ } finally {
+ conn.close()
+ }
+
+ JDBCWriteDetails.saveTable(df, url, table, connectionProperties)
+ }
+
+ /**
* Saves the content of the [[DataFrame]] in JSON format at the specified path.
* This is equivalent to:
* {{{
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 34a50e522c..ac1a800219 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
import com.google.common.reflect.TypeToken
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
@@ -40,11 +41,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.execution.{Filter, _}
-import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-import org.apache.spark.{Partition, SparkContext}
/**
* The entry point for working with structured data (rows and columns) in Spark. Allows the
@@ -532,67 +531,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
- * :: DeveloperApi ::
- * Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
- * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
- * the provided schema. Otherwise, there will be runtime exception.
- * Example:
- * {{{
- * import org.apache.spark.sql._
- * import org.apache.spark.sql.types._
- * val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- *
- * val schema =
- * StructType(
- * StructField("name", StringType, false) ::
- * StructField("age", IntegerType, true) :: Nil)
- *
- * val people =
- * sc.textFile("examples/src/main/resources/people.txt").map(
- * _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
- * val dataFrame = sqlContext. applySchema(people, schema)
- * dataFrame.printSchema
- * // root
- * // |-- name: string (nullable = false)
- * // |-- age: integer (nullable = true)
- *
- * dataFrame.registerTempTable("people")
- * sqlContext.sql("select name from people").collect.foreach(println)
- * }}}
- */
- @deprecated("use createDataFrame", "1.3.0")
- def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
- createDataFrame(rowRDD, schema)
- }
-
- @deprecated("use createDataFrame", "1.3.0")
- def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
- createDataFrame(rowRDD, schema)
- }
-
- /**
- * Applies a schema to an RDD of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- */
- @deprecated("use createDataFrame", "1.3.0")
- def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
- createDataFrame(rdd, beanClass)
- }
-
- /**
- * Applies a schema to an RDD of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- */
- @deprecated("use createDataFrame", "1.3.0")
- def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
- createDataFrame(rdd, beanClass)
- }
-
- /**
* :: Experimental ::
* Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
* {{{
@@ -607,205 +545,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
def read: DataFrameReader = new DataFrameReader(this)
/**
- * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
- * [[DataFrame]] if no paths are passed in.
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.parquet()", "1.4.0")
- @scala.annotation.varargs
- def parquetFile(paths: String*): DataFrame = {
- if (paths.isEmpty) {
- emptyDataFrame
- } else if (conf.parquetUseDataSourceApi) {
- read.parquet(paths : _*)
- } else {
- DataFrame(this, parquet.ParquetRelation(
- paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
- }
- }
-
- /**
- * Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
- * It goes through the entire dataset once to determine the schema.
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonFile(path: String): DataFrame = {
- read.json(path)
- }
-
- /**
- * Loads a JSON file (one object per line) and applies the given schema,
- * returning the result as a [[DataFrame]].
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonFile(path: String, schema: StructType): DataFrame = {
- read.schema(schema).json(path)
- }
-
- /**
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonFile(path: String, samplingRatio: Double): DataFrame = {
- read.option("samplingRatio", samplingRatio.toString).json(path)
- }
-
- /**
- * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
- * [[DataFrame]].
- * It goes through the entire dataset once to determine the schema.
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
-
- /**
- * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
- * [[DataFrame]].
- * It goes through the entire dataset once to determine the schema.
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
-
- /**
- * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
- * returning the result as a [[DataFrame]].
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
- read.schema(schema).json(json)
- }
-
- /**
- * Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
- * schema, returning the result as a [[DataFrame]].
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
- read.schema(schema).json(json)
- }
-
- /**
- * Loads an RDD[String] storing JSON objects (one object per record) inferring the
- * schema, returning the result as a [[DataFrame]].
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
- read.option("samplingRatio", samplingRatio.toString).json(json)
- }
-
- /**
- * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
- * schema, returning the result as a [[DataFrame]].
- *
- * @group specificdata
- * @since 1.3.0
- */
- @deprecated("Use read.json()", "1.4.0")
- def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
- read.option("samplingRatio", samplingRatio.toString).json(json)
- }
-
- /**
- * Returns the dataset stored at path as a DataFrame,
- * using the default data source configured by spark.sql.sources.default.
- *
- * @group genericdata
- * @since 1.3.0
- */
- @deprecated("Use read.load(path)", "1.4.0")
- def load(path: String): DataFrame = {
- read.load(path)
- }
-
- /**
- * Returns the dataset stored at path as a DataFrame, using the given data source.
- *
- * @group genericdata
- * @since 1.3.0
- */
- @deprecated("Use read.format(source).load(path)", "1.4.0")
- def load(path: String, source: String): DataFrame = {
- read.format(source).load(path)
- }
-
- /**
- * (Java-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame.
- *
- * @group genericdata
- * @since 1.3.0
- */
- @deprecated("Use read.format(source).options(options).load()", "1.4.0")
- def load(source: String, options: java.util.Map[String, String]): DataFrame = {
- read.options(options).format(source).load()
- }
-
- /**
- * (Scala-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame.
- *
- * @group genericdata
- * @since 1.3.0
- */
- @deprecated("Use read.format(source).options(options).load()", "1.4.0")
- def load(source: String, options: Map[String, String]): DataFrame = {
- read.options(options).format(source).load()
- }
-
- /**
- * (Java-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
- *
- * @group genericdata
- * @since 1.3.0
- */
- @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
- def load(
- source: String,
- schema: StructType,
- options: java.util.Map[String, String]): DataFrame = {
- read.format(source).schema(schema).options(options).load()
- }
-
- /**
- * (Scala-specific) Returns the dataset specified by the given data source and
- * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
- * @group genericdata
- * @since 1.3.0
- */
- @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
- def load(
- source: String,
- schema: StructType,
- options: Map[String, String]): DataFrame = {
- read.format(source).schema(schema).options(options).load()
- }
-
- /**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
@@ -924,132 +663,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
- * :: Experimental ::
- * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
- * url named table.
- *
- * @group specificdata
- * @since 1.3.0
- */
- @Experimental
- def jdbc(url: String, table: String): DataFrame = {
- jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
- }
-
- /**
- * :: Experimental ::
- * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
- * url named table and connection properties.
- *
- * @group specificdata
- * @since 1.4.0
- */
- @Experimental
- def jdbc(url: String, table: String, properties: Properties): DataFrame = {
- jdbc(url, table, JDBCRelation.columnPartition(null), properties)
- }
-
- /**
- * :: Experimental ::
- * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
- * url named table. Partitions of the table will be retrieved in parallel based on the parameters
- * passed to this function.
- *
- * @param columnName the name of a column of integral type that will be used for partitioning.
- * @param lowerBound the minimum value of `columnName` used to decide partition stride
- * @param upperBound the maximum value of `columnName` used to decide partition stride
- * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
- * evenly into this many partitions
- * @group specificdata
- * @since 1.3.0
- */
- @Experimental
- def jdbc(
- url: String,
- table: String,
- columnName: String,
- lowerBound: Long,
- upperBound: Long,
- numPartitions: Int): DataFrame = {
- jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties())
- }
-
- /**
- * :: Experimental ::
- * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
- * url named table. Partitions of the table will be retrieved in parallel based on the parameters
- * passed to this function.
- *
- * @param columnName the name of a column of integral type that will be used for partitioning.
- * @param lowerBound the minimum value of `columnName` used to decide partition stride
- * @param upperBound the maximum value of `columnName` used to decide partition stride
- * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
- * evenly into this many partitions
- * @param properties connection properties
- * @group specificdata
- * @since 1.4.0
- */
- @Experimental
- def jdbc(
- url: String,
- table: String,
- columnName: String,
- lowerBound: Long,
- upperBound: Long,
- numPartitions: Int,
- properties: Properties): DataFrame = {
- val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
- val parts = JDBCRelation.columnPartition(partitioning)
- jdbc(url, table, parts, properties)
- }
-
- /**
- * :: Experimental ::
- * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
- * url named table. The theParts parameter gives a list expressions
- * suitable for inclusion in WHERE clauses; each one defines one partition
- * of the [[DataFrame]].
- *
- * @group specificdata
- * @since 1.3.0
- */
- @Experimental
- def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
- jdbc(url, table, theParts, new Properties())
- }
-
- /**
- * :: Experimental ::
- * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
- * url named table using connection properties. The theParts parameter gives a list expressions
- * suitable for inclusion in WHERE clauses; each one defines one partition
- * of the [[DataFrame]].
- *
- * @group specificdata
- * @since 1.4.0
- */
- @Experimental
- def jdbc(
- url: String,
- table: String,
- theParts: Array[String],
- properties: Properties): DataFrame = {
- val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) =>
- JDBCPartition(part, i) : Partition
- }
- jdbc(url, table, parts, properties)
- }
-
- private def jdbc(
- url: String,
- table: String,
- parts: Array[Partition],
- properties: Properties): DataFrame = {
- val relation = JDBCRelation(url, table, parts, properties)(this)
- baseRelationToDataFrame(relation)
- }
-
- /**
* Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist
* only during the lifetime of this instance of SQLContext.
*/
@@ -1372,6 +985,263 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
}
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+ // Deprecated methods
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+
+ @deprecated("use createDataFrame", "1.3.0")
+ def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
+ createDataFrame(rowRDD, schema)
+ }
+
+ @deprecated("use createDataFrame", "1.3.0")
+ def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+ createDataFrame(rowRDD, schema)
+ }
+
+ @deprecated("use createDataFrame", "1.3.0")
+ def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
+ createDataFrame(rdd, beanClass)
+ }
+
+ @deprecated("use createDataFrame", "1.3.0")
+ def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
+ createDataFrame(rdd, beanClass)
+ }
+
+ /**
+ * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
+ * [[DataFrame]] if no paths are passed in.
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.parquet()", "1.4.0")
+ @scala.annotation.varargs
+ def parquetFile(paths: String*): DataFrame = {
+ if (paths.isEmpty) {
+ emptyDataFrame
+ } else if (conf.parquetUseDataSourceApi) {
+ read.parquet(paths : _*)
+ } else {
+ DataFrame(this, parquet.ParquetRelation(
+ paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
+ }
+ }
+
+ /**
+ * Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonFile(path: String): DataFrame = {
+ read.json(path)
+ }
+
+ /**
+ * Loads a JSON file (one object per line) and applies the given schema,
+ * returning the result as a [[DataFrame]].
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonFile(path: String, schema: StructType): DataFrame = {
+ read.schema(schema).json(path)
+ }
+
+ /**
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonFile(path: String, samplingRatio: Double): DataFrame = {
+ read.option("samplingRatio", samplingRatio.toString).json(path)
+ }
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+ * [[DataFrame]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+ * [[DataFrame]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
+ * returning the result as a [[DataFrame]].
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
+ read.schema(schema).json(json)
+ }
+
+ /**
+ * Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
+ * schema, returning the result as a [[DataFrame]].
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
+ read.schema(schema).json(json)
+ }
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record) inferring the
+ * schema, returning the result as a [[DataFrame]].
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
+ read.option("samplingRatio", samplingRatio.toString).json(json)
+ }
+
+ /**
+ * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
+ * schema, returning the result as a [[DataFrame]].
+ *
+ * @group specificdata
+ */
+ @deprecated("Use read.json()", "1.4.0")
+ def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
+ read.option("samplingRatio", samplingRatio.toString).json(json)
+ }
+
+ /**
+ * Returns the dataset stored at path as a DataFrame,
+ * using the default data source configured by spark.sql.sources.default.
+ *
+ * @group genericdata
+ */
+ @deprecated("Use read.load(path)", "1.4.0")
+ def load(path: String): DataFrame = {
+ read.load(path)
+ }
+
+ /**
+ * Returns the dataset stored at path as a DataFrame, using the given data source.
+ *
+ * @group genericdata
+ */
+ @deprecated("Use read.format(source).load(path)", "1.4.0")
+ def load(path: String, source: String): DataFrame = {
+ read.format(source).load(path)
+ }
+
+ /**
+ * (Java-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame.
+ *
+ * @group genericdata
+ */
+ @deprecated("Use read.format(source).options(options).load()", "1.4.0")
+ def load(source: String, options: java.util.Map[String, String]): DataFrame = {
+ read.options(options).format(source).load()
+ }
+
+ /**
+ * (Scala-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame.
+ *
+ * @group genericdata
+ */
+ @deprecated("Use read.format(source).options(options).load()", "1.4.0")
+ def load(source: String, options: Map[String, String]): DataFrame = {
+ read.options(options).format(source).load()
+ }
+
+ /**
+ * (Java-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
+ *
+ * @group genericdata
+ */
+ @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
+ def load(source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame =
+ {
+ read.format(source).schema(schema).options(options).load()
+ }
+
+ /**
+ * (Scala-specific) Returns the dataset specified by the given data source and
+ * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
+ *
+ * @group genericdata
+ */
+ @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
+ def load(source: String, schema: StructType, options: Map[String, String]): DataFrame = {
+ read.format(source).schema(schema).options(options).load()
+ }
+
+ /**
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+ * url named table.
+ *
+ * @group specificdata
+ */
+ @deprecated("use read.jdbc()", "1.4.0")
+ def jdbc(url: String, table: String): DataFrame = {
+ read.jdbc(url, table, new Properties)
+ }
+
+ /**
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+ * url named table. Partitions of the table will be retrieved in parallel based on the parameters
+ * passed to this function.
+ *
+ * @param columnName the name of a column of integral type that will be used for partitioning.
+ * @param lowerBound the minimum value of `columnName` used to decide partition stride
+ * @param upperBound the maximum value of `columnName` used to decide partition stride
+ * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
+ * evenly into this many partitions
+ * @group specificdata
+ */
+ @deprecated("use read.jdbc()", "1.4.0")
+ def jdbc(
+ url: String,
+ table: String,
+ columnName: String,
+ lowerBound: Long,
+ upperBound: Long,
+ numPartitions: Int): DataFrame = {
+ read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties)
+ }
+
+ /**
+ * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+ * url named table. The theParts parameter gives a list expressions
+ * suitable for inclusion in WHERE clauses; each one defines one partition
+ * of the [[DataFrame]].
+ *
+ * @group specificdata
+ */
+ @deprecated("use read.jdbc()", "1.4.0")
+ def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
+ read.jdbc(url, table, theParts, new Properties)
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+ // End of eeprecated methods
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 40483d3ec7..95935ba874 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -29,7 +29,16 @@ import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._
+/**
+ * Data corresponding to one partition of a JDBCRDD.
+ */
+private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition {
+ override def index: Int = idx
+}
+
+
private[sql] object JDBCRDD extends Logging {
+
/**
* Maps a JDBC type to a Catalyst type. This function is called only when
* the DriverQuirks class corresponding to your database driver returns null.
@@ -168,6 +177,7 @@ private[sql] object JDBCRDD extends Logging {
DriverManager.getConnection(url, properties)
}
}
+
/**
* Build and return JDBCRDD from the given information.
*
@@ -193,18 +203,14 @@ private[sql] object JDBCRDD extends Logging {
requiredColumns: Array[String],
filters: Array[Filter],
parts: Array[Partition]): RDD[Row] = {
-
- val prunedSchema = pruneSchema(schema, requiredColumns)
-
- return new
- JDBCRDD(
- sc,
- getConnector(driver, url, properties),
- prunedSchema,
- fqTable,
- requiredColumns,
- filters,
- parts)
+ new JDBCRDD(
+ sc,
+ getConnector(driver, url, properties),
+ pruneSchema(schema, requiredColumns),
+ fqTable,
+ requiredColumns,
+ filters,
+ parts)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index 93e82549f2..09d6865457 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -17,26 +17,16 @@
package org.apache.spark.sql.jdbc
-import java.sql.DriverManager
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
-
-/**
- * Data corresponding to one partition of a JDBCRDD.
- */
-private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition {
- override def index: Int = idx
-}
/**
* Instructions on how to partition the table among workers.
@@ -152,6 +142,8 @@ private[sql] case class JDBCRelation(
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
- data.insertIntoJDBC(url, table, overwrite, properties)
+ data.write
+ .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
+ .jdbc(url, table, properties)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala
new file mode 100644
index 0000000000..cc918c2371
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.{Connection, DriverManager}
+import java.util.Properties
+
+import scala.util.Try
+
+/**
+ * Util functions for JDBC tables.
+ */
+private[sql] object JdbcUtils {
+
+ /**
+ * Establishes a JDBC connection.
+ */
+ def createConnection(url: String, connectionProperties: Properties): Connection = {
+ DriverManager.getConnection(url, connectionProperties)
+ }
+
+ /**
+ * Returns true if the table already exists in the JDBC database.
+ */
+ def tableExists(conn: Connection, table: String): Boolean = {
+ // Somewhat hacky, but there isn't a good way to identify whether a table exists for all
+ // SQL database systems, considering "table" could also include the database name.
+ Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess
+ }
+
+ /**
+ * Drops a table from the JDBC database.
+ */
+ def dropTable(conn: Connection, table: String): Unit = {
+ conn.prepareStatement(s"DROP TABLE $table").executeUpdate()
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
index c099881a01..a61790b847 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
@@ -163,8 +163,8 @@ package object jdbc {
table: String,
properties: Properties = new Properties()) {
val quirks = DriverQuirks.get(url)
- var nullTypes: Array[Int] = df.schema.fields.map(field => {
- var nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
+ val nullTypes: Array[Int] = df.schema.fields.map { field =>
+ val nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
if (nullType.isEmpty) {
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
@@ -183,7 +183,7 @@ package object jdbc {
s"Can't translate null value for field $field")
}
} else nullType.get
- }).toArray
+ }
val rddSchema = df.schema
df.foreachPartition { iterator =>
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
index c344a9b095..fcb8f5499c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
@@ -187,14 +187,14 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is another simple string."));
- DataFrame df1 = sqlContext.jsonRDD(jsonRDD);
+ DataFrame df1 = sqlContext.read().json(jsonRDD);
StructType actualSchema1 = df1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
df1.registerTempTable("jsonTable1");
List<Row> actual1 = sqlContext.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);
- DataFrame df2 = sqlContext.jsonRDD(jsonRDD, expectedSchema);
+ DataFrame df2 = sqlContext.read().schema(expectedSchema).json(jsonRDD);
StructType actualSchema2 = df2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
df2.registerTempTable("jsonTable2");
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
index 6a0bcefe7a..2706e01bd2 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
@@ -67,7 +67,7 @@ public class JavaSaveLoadSuite {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
JavaRDD<String> rdd = sc.parallelize(jsonObjects);
- df = sqlContext.jsonRDD(rdd);
+ df = sqlContext.read().json(rdd);
df.registerTempTable("jsonTable");
}
@@ -75,10 +75,8 @@ public class JavaSaveLoadSuite {
public void saveAndLoad() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
- df.save("json", SaveMode.ErrorIfExists, options);
-
+ df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save();
DataFrame loadedDF = sqlContext.read().format("json").options(options).load();
-
checkAnswer(loadedDF, df.collectAsList());
}
@@ -86,12 +84,12 @@ public class JavaSaveLoadSuite {
public void saveAndLoadWithSchema() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
- df.save("json", SaveMode.ErrorIfExists, options);
+ df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
- DataFrame loadedDF = sqlContext.load("json", schema, options);
+ DataFrame loadedDF = sqlContext.read().format("json").schema(schema).options(options).load();
checkAnswer(loadedDF, sqlContext.sql("SELECT b FROM jsonTable").collectAsList());
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 2abfe7f167..5a7b6f0aac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -221,22 +221,25 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
}
test("Basic API") {
- assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE").collect().size === 3)
+ assert(TestSQLContext.read.jdbc(
+ urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3)
}
test("Partitioning via JDBCPartitioningInfo API") {
- assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3)
- .collect.size === 3)
+ assert(
+ TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)
+ .collect().length === 3)
}
test("Partitioning via list-of-where-clauses API") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
- assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts).collect().size === 3)
+ assert(TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties)
+ .collect().length === 3)
}
test("H2 integral types") {
val rows = sql("SELECT * FROM inttypes WHERE A IS NOT NULL").collect()
- assert(rows.size === 1)
+ assert(rows.length === 1)
assert(rows(0).getInt(0) === 1)
assert(rows(0).getBoolean(1) === false)
assert(rows(0).getInt(2) === 3)
@@ -246,7 +249,7 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
test("H2 null entries") {
val rows = sql("SELECT * FROM inttypes WHERE A IS NULL").collect()
- assert(rows.size === 1)
+ assert(rows.length === 1)
assert(rows(0).isNullAt(0))
assert(rows(0).isNullAt(1))
assert(rows(0).isNullAt(2))
@@ -286,24 +289,28 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
}
test("test DATE types") {
- val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").collect()
- val cachedRows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").cache().collect()
+ val rows = TestSQLContext.read.jdbc(
+ urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
+ val cachedRows = TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+ .cache().collect()
assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
assert(rows(1).getAs[java.sql.Date](1) === null)
assert(cachedRows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
}
test("test DATE types in cache") {
- val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").collect()
- TestSQLContext
- .jdbc(urlWithUserAndPass, "TEST.TIMETYPES").cache().registerTempTable("mycached_date")
+ val rows =
+ TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
+ TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+ .cache().registerTempTable("mycached_date")
val cachedRows = sql("select * from mycached_date").collect()
assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
assert(cachedRows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
}
test("test types for null value") {
- val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.NULLTYPES").collect()
+ val rows = TestSQLContext.read.jdbc(
+ urlWithUserAndPass, "TEST.NULLTYPES", new Properties).collect()
assert((0 to 14).forall(i => rows(0).isNullAt(i)))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 0800eded44..2e4c12f9da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{SaveMode, Row}
import org.apache.spark.sql.test._
import org.apache.spark.sql.types._
@@ -90,64 +90,66 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
test("Basic CREATE") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
- df.createJDBCTable(url, "TEST.BASICCREATETEST", false)
- assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count)
- assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length)
+ df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties)
+ assert(2 == TestSQLContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count)
+ assert(2 ==
+ TestSQLContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length)
}
test("CREATE with overwrite") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
- df.createJDBCTable(url1, "TEST.DROPTEST", false, properties)
- assert(2 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).count)
- assert(3 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+ df.write.jdbc(url1, "TEST.DROPTEST", properties)
+ assert(2 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).count)
+ assert(3 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
- df2.createJDBCTable(url1, "TEST.DROPTEST", true, properties)
- assert(1 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).count)
- assert(2 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+ df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties)
+ assert(1 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).count)
+ assert(2 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
}
test("CREATE then INSERT to append") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
- df.createJDBCTable(url, "TEST.APPENDTEST", false)
- df2.insertIntoJDBC(url, "TEST.APPENDTEST", false)
- assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count)
- assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length)
+ df.write.jdbc(url, "TEST.APPENDTEST", new Properties)
+ df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties)
+ assert(3 == TestSQLContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).count)
+ assert(2 ==
+ TestSQLContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length)
}
test("CREATE then INSERT to truncate") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
- df.createJDBCTable(url1, "TEST.TRUNCATETEST", false, properties)
- df2.insertIntoJDBC(url1, "TEST.TRUNCATETEST", true, properties)
- assert(1 == TestSQLContext.jdbc(url1, "TEST.TRUNCATETEST", properties).count)
- assert(2 == TestSQLContext.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
+ df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
+ df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties)
+ assert(1 == TestSQLContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count)
+ assert(2 == TestSQLContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
}
test("Incompatible INSERT to append") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
- df.createJDBCTable(url, "TEST.INCOMPATIBLETEST", false)
+ df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
intercept[org.apache.spark.SparkException] {
- df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
+ df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
}
}
-
+
test("INSERT to JDBC Datasource") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
- assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
- assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
+ assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
+ assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
-
+
test("INSERT to JDBC Datasource with overwrite") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
- assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
- assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
+ assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
+ assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index 53ddecf579..58fe96adab 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -81,7 +81,7 @@ public class JavaMetastoreDataSourcesSuite {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
JavaRDD<String> rdd = sc.parallelize(jsonObjects);
- df = sqlContext.jsonRDD(rdd);
+ df = sqlContext.read().json(rdd);
df.registerTempTable("jsonTable");
}
@@ -96,7 +96,11 @@ public class JavaMetastoreDataSourcesSuite {
public void saveExternalTableAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
- df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+ df.write()
+ .format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .options(options)
+ .saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),
@@ -115,7 +119,11 @@ public class JavaMetastoreDataSourcesSuite {
public void saveExternalTableWithSchemaAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
- df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+ df.write()
+ .format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .options(options)
+ .saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),
@@ -138,7 +146,11 @@ public class JavaMetastoreDataSourcesSuite {
@Test
public void saveTableAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
- df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+ df.write()
+ .format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .options(options)
+ .saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index fc6c3c3503..945596db80 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -162,7 +162,7 @@ class CachedTableSuite extends QueryTest {
test("REFRESH TABLE also needs to recache the data (data source tables)") {
val tempPath: File = Utils.createTempDir()
tempPath.delete()
- table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
+ table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
sql("DROP TABLE IF EXISTS refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
@@ -172,7 +172,7 @@ class CachedTableSuite extends QueryTest {
sql("CACHE TABLE refreshTable")
assertCached(table("refreshTable"))
// Append new data.
- table("src").save(tempPath.toString, "parquet", SaveMode.Append)
+ table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
// We are still using the old data.
assertCached(table("refreshTable"))
checkAnswer(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 58b0b80c31..30db976a3a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -409,11 +409,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- val df = jsonRDD(rdd)
+ val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
// Save the df as a managed table (by not specifiying the path).
- df.saveAsTable("savedJsonTable")
+ df.write.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
@@ -443,11 +443,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- val df = jsonRDD(rdd)
+ val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
// Save the df as a managed table (by not specifiying the path).
- df.saveAsTable("savedJsonTable")
+ df.write.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
@@ -455,17 +455,17 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// Right now, we cannot append to an existing JSON table.
intercept[RuntimeException] {
- df.saveAsTable("savedJsonTable", SaveMode.Append)
+ df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable")
}
// We can overwrite it.
- df.saveAsTable("savedJsonTable", SaveMode.Overwrite)
+ df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
// When the save mode is Ignore, we will do nothing when the table already exists.
- df.select("b").saveAsTable("savedJsonTable", SaveMode.Ignore)
+ df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable")
assert(df.schema === table("savedJsonTable").schema)
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
@@ -479,11 +479,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// Create an external table by specifying the path.
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
- df.saveAsTable(
- "savedJsonTable",
- "org.apache.spark.sql.json",
- SaveMode.Append,
- Map("path" -> tempPath.toString))
+ df.write
+ .format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .option("path", tempPath.toString)
+ .saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
@@ -501,14 +501,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- val df = jsonRDD(rdd)
+ val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
- df.saveAsTable(
- "savedJsonTable",
- "org.apache.spark.sql.json",
- SaveMode.Append,
- Map("path" -> tempPath.toString))
+ df.write.format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .option("path", tempPath.toString)
+ .saveAsTable("savedJsonTable")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
createExternalTable("createdJsonTable", tempPath.toString)
@@ -566,7 +565,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- jsonRDD(rdd).registerTempTable("jt")
+ read.json(rdd).registerTempTable("jt")
sql(
"""
|create table test_parquet_ctas STORED AS parquET
@@ -601,7 +600,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil)
assert(df1.schema === expectedSchema1)
- df1.saveAsTable("arrayInParquet", "parquet", SaveMode.Overwrite)
+ df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("arrayInParquet")
val df2 =
createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a")
@@ -610,10 +609,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
df2.insertInto("arrayInParquet", overwrite = false)
- createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
- .saveAsTable("arrayInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
- createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a")
- .saveAsTable("arrayInParquet", "parquet", SaveMode.Append)
+ createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
+ .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
+ createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
+ .mode(SaveMode.Append).saveAsTable("arrayInParquet")
refreshTable("arrayInParquet")
checkAnswer(
@@ -634,7 +633,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", mapType1, nullable = true) :: Nil)
assert(df1.schema === expectedSchema1)
- df1.saveAsTable("mapInParquet", "parquet", SaveMode.Overwrite)
+ df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("mapInParquet")
val df2 =
createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
@@ -644,10 +643,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructField("a", mapType2, nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
df2.insertInto("mapInParquet", overwrite = false)
- createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
- .saveAsTable("mapInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
- createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
- .saveAsTable("mapInParquet", "parquet", SaveMode.Append)
+ createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
+ .saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
+ createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
+ .format("parquet").mode(SaveMode.Append).saveAsTable("mapInParquet")
refreshTable("mapInParquet")
checkAnswer(
@@ -711,30 +710,30 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
def createDF(from: Int, to: Int): DataFrame =
createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")
- createDF(0, 9).saveAsTable("insertParquet", "parquet")
+ createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
(6 to 9).map(i => Row(i, s"str$i")))
intercept[AnalysisException] {
- createDF(10, 19).saveAsTable("insertParquet", "parquet")
+ createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
}
- createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append)
+ createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))
- createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append)
+ createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
(6 to 24).map(i => Row(i, s"str$i")))
intercept[AnalysisException] {
- createDF(30, 39).saveAsTable("insertParquet")
+ createDF(30, 39).write.saveAsTable("insertParquet")
}
- createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append)
+ createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
(6 to 34).map(i => Row(i, s"str$i")))
@@ -744,11 +743,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
(6 to 44).map(i => Row(i, s"str$i")))
- createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite)
+ createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
(52 to 54).map(i => Row(i, s"str$i")))
- createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore)
+ createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
(50 to 59).map(i => Row(i, s"str$i")))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 8ad3627504..3dfa6e72e1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.hive.test.TestHive.{sparkContext, jsonRDD, sql}
+import org.apache.spark.sql.hive.test.TestHive.{read, sparkContext, jsonRDD, sql}
import org.apache.spark.sql.hive.test.TestHive.implicits._
case class Nested(a: Int, B: Int)
@@ -31,14 +31,14 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])
class HiveResolutionSuite extends HiveComparisonTest {
test("SPARK-3698: case insensitive test for nested data") {
- jsonRDD(sparkContext.makeRDD(
+ read.json(sparkContext.makeRDD(
"""{"a": [{"a": {"a": 1}}]}""" :: Nil)).registerTempTable("nested")
// This should be successfully analyzed
sql("SELECT a[0].A.A from nested").queryExecution.analyzed
}
test("SPARK-5278: check ambiguous reference to fields") {
- jsonRDD(sparkContext.makeRDD(
+ read.json(sparkContext.makeRDD(
"""{"a": [{"b": 1, "B": 2}]}""" :: Nil)).registerTempTable("nested")
// there are 2 filed matching field name "b", we should report Ambiguous reference error
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index dfe73c62c4..ca2c4b4019 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -535,14 +535,14 @@ class SQLQuerySuite extends QueryTest {
test("SPARK-4296 Grouping field with Hive UDF as sub expression") {
val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil)
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
checkAnswer(
sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"),
Row("str-1", 1970))
dropTempTable("data")
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971))
dropTempTable("data")
@@ -550,7 +550,7 @@ class SQLQuerySuite extends QueryTest {
test("resolve udtf with single alias") {
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
val df = sql("SELECT explode(a) AS val FROM data")
val col = df("val")
}
@@ -563,7 +563,7 @@ class SQLQuerySuite extends QueryTest {
// PreInsertionCasts will actually start to work before ImplicitGenerate and then
// generates an invalid query plan.
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
setConf("spark.sql.hive.convertCTAS", "false")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a0075f1e44..05d99983b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -150,9 +150,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
}
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
- jsonRDD(rdd1).registerTempTable("jt")
+ read.json(rdd1).registerTempTable("jt")
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
- jsonRDD(rdd2).registerTempTable("jt_array")
+ read.json(rdd2).registerTempTable("jt_array")
setConf("spark.sql.hive.convertMetastoreParquet", "true")
}
@@ -617,16 +617,16 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
sql("drop table if exists spark_6016_fix")
// Create a DataFrame with two partitions. So, the created table will have two parquet files.
- val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
- df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+ val df1 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
+ df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)
// Create a DataFrame with four partitions. So, the created table will have four parquet files.
- val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
- df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+ val df2 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
+ df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
// since the new table has four parquet files, we are trying to read new footers from two files
// and then merge metadata in footers of these four (two outdated ones and two latest one),
@@ -663,7 +663,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
StructField("a", arrayType1, nullable = true) :: Nil)
assert(df.schema === expectedSchema1)
- df.saveAsTable("alwaysNullable", "parquet")
+ df.write.format("parquet").saveAsTable("alwaysNullable")
val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
val arrayType2 = ArrayType(IntegerType, containsNull = true)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index f44b3c521e..9d9b436cab 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -120,10 +120,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
test("save()/load() - non-partitioned table - ErrorIfExists") {
withTempDir { file =>
intercept[RuntimeException] {
- testDF.save(
- path = file.getCanonicalPath,
- source = dataSourceName,
- mode = SaveMode.ErrorIfExists)
+ testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath)
}
}
}
@@ -233,10 +230,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Ignore") {
withTempDir { file =>
- partitionedTestDF.save(
- path = file.getCanonicalPath,
- source = dataSourceName,
- mode = SaveMode.Ignore)
+ partitionedTestDF.write
+ .format(dataSourceName).mode(SaveMode.Ignore).save(file.getCanonicalPath)
val path = new Path(file.getCanonicalPath)
val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
@@ -249,11 +244,9 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
}
test("saveAsTable()/load() - non-partitioned table - Overwrite") {
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Overwrite,
- Map("dataSchema" -> dataSchema.json))
+ testDF.write.format(dataSourceName).mode(SaveMode.Overwrite)
+ .option("dataSchema", dataSchema.json)
+ .saveAsTable("t")
withTable("t") {
checkAnswer(table("t"), testDF.collect())
@@ -261,15 +254,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
}
test("saveAsTable()/load() - non-partitioned table - Append") {
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Overwrite)
-
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Append)
+ testDF.write.format(dataSourceName).mode(SaveMode.Overwrite).saveAsTable("t")
+ testDF.write.format(dataSourceName).mode(SaveMode.Append).saveAsTable("t")
withTable("t") {
checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
@@ -281,10 +267,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
withTempTable("t") {
intercept[AnalysisException] {
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.ErrorIfExists)
+ testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t")
}
}
}
@@ -293,21 +276,16 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
Seq.empty[(Int, String)].toDF().registerTempTable("t")
withTempTable("t") {
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Ignore)
-
+ testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t")
assert(table("t").collect().isEmpty)
}
}
test("saveAsTable()/load() - partitioned table - simple queries") {
- partitionedTestDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Overwrite,
- Map("dataSchema" -> dataSchema.json))
+ partitionedTestDF.write.format(dataSourceName)
+ .mode(SaveMode.Overwrite)
+ .option("dataSchema", dataSchema.json)
+ .saveAsTable("t")
withTable("t") {
checkQueries(table("t"))
@@ -492,11 +470,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
- load(
- source = dataSourceName,
- options = Map(
- "path" -> file.getCanonicalPath,
- "dataSchema" -> dataSchemaWithPartition.json)))
+ read.format(dataSourceName)
+ .option("dataSchema", dataSchemaWithPartition.json)
+ .load(file.getCanonicalPath))
}
}
}
@@ -518,18 +494,16 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
sparkContext
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
- .saveAsParquetFile(partitionDir.toString)
+ .write.parquet(partitionDir.toString)
}
val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
- load(
- source = dataSourceName,
- options = Map(
- "path" -> file.getCanonicalPath,
- "dataSchema" -> dataSchemaWithPartition.json)))
+ read.format(dataSourceName)
+ .option("dataSchema", dataSchemaWithPartition.json)
+ .load(file.getCanonicalPath))
}
}
}