aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-16 22:01:53 -0700
committerReynold Xin <rxin@databricks.com>2015-05-16 22:01:53 -0700
commit517eb37a85e0a28820bcfd5d98c50d02df6521c6 (patch)
tree1c33fdee296ea6d88df6a218b649efe8fa10662a /sql/hive
parent3b6ef2c5391b528ef989e24400fbb0c496c3b245 (diff)
downloadspark-517eb37a85e0a28820bcfd5d98c50d02df6521c6.tar.gz
spark-517eb37a85e0a28820bcfd5d98c50d02df6521c6.tar.bz2
spark-517eb37a85e0a28820bcfd5d98c50d02df6521c6.zip
[SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.
Also moved all the deprecated functions into one place for SQLContext and DataFrame, and updated tests to use the new API. Author: Reynold Xin <rxin@databricks.com> Closes #6210 from rxin/df-writer-reader-jdbc and squashes the following commits: 7465c2c [Reynold Xin] Fixed unit test. 118e609 [Reynold Xin] Updated tests. 3441b57 [Reynold Xin] Updated javadoc. 13cdd1c [Reynold Xin] [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala73
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala68
7 files changed, 89 insertions, 104 deletions
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index 53ddecf579..58fe96adab 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -81,7 +81,7 @@ public class JavaMetastoreDataSourcesSuite {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
JavaRDD<String> rdd = sc.parallelize(jsonObjects);
- df = sqlContext.jsonRDD(rdd);
+ df = sqlContext.read().json(rdd);
df.registerTempTable("jsonTable");
}
@@ -96,7 +96,11 @@ public class JavaMetastoreDataSourcesSuite {
public void saveExternalTableAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
- df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+ df.write()
+ .format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .options(options)
+ .saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),
@@ -115,7 +119,11 @@ public class JavaMetastoreDataSourcesSuite {
public void saveExternalTableWithSchemaAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
- df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+ df.write()
+ .format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .options(options)
+ .saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),
@@ -138,7 +146,11 @@ public class JavaMetastoreDataSourcesSuite {
@Test
public void saveTableAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
- df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+ df.write()
+ .format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .options(options)
+ .saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index fc6c3c3503..945596db80 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -162,7 +162,7 @@ class CachedTableSuite extends QueryTest {
test("REFRESH TABLE also needs to recache the data (data source tables)") {
val tempPath: File = Utils.createTempDir()
tempPath.delete()
- table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
+ table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
sql("DROP TABLE IF EXISTS refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
@@ -172,7 +172,7 @@ class CachedTableSuite extends QueryTest {
sql("CACHE TABLE refreshTable")
assertCached(table("refreshTable"))
// Append new data.
- table("src").save(tempPath.toString, "parquet", SaveMode.Append)
+ table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
// We are still using the old data.
assertCached(table("refreshTable"))
checkAnswer(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 58b0b80c31..30db976a3a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -409,11 +409,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- val df = jsonRDD(rdd)
+ val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
// Save the df as a managed table (by not specifiying the path).
- df.saveAsTable("savedJsonTable")
+ df.write.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
@@ -443,11 +443,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- val df = jsonRDD(rdd)
+ val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
// Save the df as a managed table (by not specifiying the path).
- df.saveAsTable("savedJsonTable")
+ df.write.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
@@ -455,17 +455,17 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// Right now, we cannot append to an existing JSON table.
intercept[RuntimeException] {
- df.saveAsTable("savedJsonTable", SaveMode.Append)
+ df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable")
}
// We can overwrite it.
- df.saveAsTable("savedJsonTable", SaveMode.Overwrite)
+ df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
// When the save mode is Ignore, we will do nothing when the table already exists.
- df.select("b").saveAsTable("savedJsonTable", SaveMode.Ignore)
+ df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable")
assert(df.schema === table("savedJsonTable").schema)
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
@@ -479,11 +479,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// Create an external table by specifying the path.
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
- df.saveAsTable(
- "savedJsonTable",
- "org.apache.spark.sql.json",
- SaveMode.Append,
- Map("path" -> tempPath.toString))
+ df.write
+ .format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .option("path", tempPath.toString)
+ .saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
@@ -501,14 +501,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- val df = jsonRDD(rdd)
+ val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
- df.saveAsTable(
- "savedJsonTable",
- "org.apache.spark.sql.json",
- SaveMode.Append,
- Map("path" -> tempPath.toString))
+ df.write.format("org.apache.spark.sql.json")
+ .mode(SaveMode.Append)
+ .option("path", tempPath.toString)
+ .saveAsTable("savedJsonTable")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
createExternalTable("createdJsonTable", tempPath.toString)
@@ -566,7 +565,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- jsonRDD(rdd).registerTempTable("jt")
+ read.json(rdd).registerTempTable("jt")
sql(
"""
|create table test_parquet_ctas STORED AS parquET
@@ -601,7 +600,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil)
assert(df1.schema === expectedSchema1)
- df1.saveAsTable("arrayInParquet", "parquet", SaveMode.Overwrite)
+ df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("arrayInParquet")
val df2 =
createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a")
@@ -610,10 +609,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
df2.insertInto("arrayInParquet", overwrite = false)
- createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
- .saveAsTable("arrayInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
- createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a")
- .saveAsTable("arrayInParquet", "parquet", SaveMode.Append)
+ createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
+ .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
+ createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
+ .mode(SaveMode.Append).saveAsTable("arrayInParquet")
refreshTable("arrayInParquet")
checkAnswer(
@@ -634,7 +633,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", mapType1, nullable = true) :: Nil)
assert(df1.schema === expectedSchema1)
- df1.saveAsTable("mapInParquet", "parquet", SaveMode.Overwrite)
+ df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("mapInParquet")
val df2 =
createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
@@ -644,10 +643,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructField("a", mapType2, nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
df2.insertInto("mapInParquet", overwrite = false)
- createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
- .saveAsTable("mapInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
- createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
- .saveAsTable("mapInParquet", "parquet", SaveMode.Append)
+ createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
+ .saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
+ createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
+ .format("parquet").mode(SaveMode.Append).saveAsTable("mapInParquet")
refreshTable("mapInParquet")
checkAnswer(
@@ -711,30 +710,30 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
def createDF(from: Int, to: Int): DataFrame =
createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")
- createDF(0, 9).saveAsTable("insertParquet", "parquet")
+ createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
(6 to 9).map(i => Row(i, s"str$i")))
intercept[AnalysisException] {
- createDF(10, 19).saveAsTable("insertParquet", "parquet")
+ createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
}
- createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append)
+ createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))
- createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append)
+ createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
(6 to 24).map(i => Row(i, s"str$i")))
intercept[AnalysisException] {
- createDF(30, 39).saveAsTable("insertParquet")
+ createDF(30, 39).write.saveAsTable("insertParquet")
}
- createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append)
+ createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
(6 to 34).map(i => Row(i, s"str$i")))
@@ -744,11 +743,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
(6 to 44).map(i => Row(i, s"str$i")))
- createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite)
+ createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
(52 to 54).map(i => Row(i, s"str$i")))
- createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore)
+ createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
(50 to 59).map(i => Row(i, s"str$i")))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 8ad3627504..3dfa6e72e1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.hive.test.TestHive.{sparkContext, jsonRDD, sql}
+import org.apache.spark.sql.hive.test.TestHive.{read, sparkContext, jsonRDD, sql}
import org.apache.spark.sql.hive.test.TestHive.implicits._
case class Nested(a: Int, B: Int)
@@ -31,14 +31,14 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])
class HiveResolutionSuite extends HiveComparisonTest {
test("SPARK-3698: case insensitive test for nested data") {
- jsonRDD(sparkContext.makeRDD(
+ read.json(sparkContext.makeRDD(
"""{"a": [{"a": {"a": 1}}]}""" :: Nil)).registerTempTable("nested")
// This should be successfully analyzed
sql("SELECT a[0].A.A from nested").queryExecution.analyzed
}
test("SPARK-5278: check ambiguous reference to fields") {
- jsonRDD(sparkContext.makeRDD(
+ read.json(sparkContext.makeRDD(
"""{"a": [{"b": 1, "B": 2}]}""" :: Nil)).registerTempTable("nested")
// there are 2 filed matching field name "b", we should report Ambiguous reference error
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index dfe73c62c4..ca2c4b4019 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -535,14 +535,14 @@ class SQLQuerySuite extends QueryTest {
test("SPARK-4296 Grouping field with Hive UDF as sub expression") {
val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil)
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
checkAnswer(
sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"),
Row("str-1", 1970))
dropTempTable("data")
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971))
dropTempTable("data")
@@ -550,7 +550,7 @@ class SQLQuerySuite extends QueryTest {
test("resolve udtf with single alias") {
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
val df = sql("SELECT explode(a) AS val FROM data")
val col = df("val")
}
@@ -563,7 +563,7 @@ class SQLQuerySuite extends QueryTest {
// PreInsertionCasts will actually start to work before ImplicitGenerate and then
// generates an invalid query plan.
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
setConf("spark.sql.hive.convertCTAS", "false")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a0075f1e44..05d99983b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -150,9 +150,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
}
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
- jsonRDD(rdd1).registerTempTable("jt")
+ read.json(rdd1).registerTempTable("jt")
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
- jsonRDD(rdd2).registerTempTable("jt_array")
+ read.json(rdd2).registerTempTable("jt_array")
setConf("spark.sql.hive.convertMetastoreParquet", "true")
}
@@ -617,16 +617,16 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
sql("drop table if exists spark_6016_fix")
// Create a DataFrame with two partitions. So, the created table will have two parquet files.
- val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
- df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+ val df1 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
+ df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)
// Create a DataFrame with four partitions. So, the created table will have four parquet files.
- val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
- df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+ val df2 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
+ df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
// since the new table has four parquet files, we are trying to read new footers from two files
// and then merge metadata in footers of these four (two outdated ones and two latest one),
@@ -663,7 +663,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
StructField("a", arrayType1, nullable = true) :: Nil)
assert(df.schema === expectedSchema1)
- df.saveAsTable("alwaysNullable", "parquet")
+ df.write.format("parquet").saveAsTable("alwaysNullable")
val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
val arrayType2 = ArrayType(IntegerType, containsNull = true)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index f44b3c521e..9d9b436cab 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -120,10 +120,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
test("save()/load() - non-partitioned table - ErrorIfExists") {
withTempDir { file =>
intercept[RuntimeException] {
- testDF.save(
- path = file.getCanonicalPath,
- source = dataSourceName,
- mode = SaveMode.ErrorIfExists)
+ testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath)
}
}
}
@@ -233,10 +230,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Ignore") {
withTempDir { file =>
- partitionedTestDF.save(
- path = file.getCanonicalPath,
- source = dataSourceName,
- mode = SaveMode.Ignore)
+ partitionedTestDF.write
+ .format(dataSourceName).mode(SaveMode.Ignore).save(file.getCanonicalPath)
val path = new Path(file.getCanonicalPath)
val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
@@ -249,11 +244,9 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
}
test("saveAsTable()/load() - non-partitioned table - Overwrite") {
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Overwrite,
- Map("dataSchema" -> dataSchema.json))
+ testDF.write.format(dataSourceName).mode(SaveMode.Overwrite)
+ .option("dataSchema", dataSchema.json)
+ .saveAsTable("t")
withTable("t") {
checkAnswer(table("t"), testDF.collect())
@@ -261,15 +254,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
}
test("saveAsTable()/load() - non-partitioned table - Append") {
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Overwrite)
-
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Append)
+ testDF.write.format(dataSourceName).mode(SaveMode.Overwrite).saveAsTable("t")
+ testDF.write.format(dataSourceName).mode(SaveMode.Append).saveAsTable("t")
withTable("t") {
checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
@@ -281,10 +267,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
withTempTable("t") {
intercept[AnalysisException] {
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.ErrorIfExists)
+ testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t")
}
}
}
@@ -293,21 +276,16 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
Seq.empty[(Int, String)].toDF().registerTempTable("t")
withTempTable("t") {
- testDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Ignore)
-
+ testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t")
assert(table("t").collect().isEmpty)
}
}
test("saveAsTable()/load() - partitioned table - simple queries") {
- partitionedTestDF.saveAsTable(
- tableName = "t",
- source = dataSourceName,
- mode = SaveMode.Overwrite,
- Map("dataSchema" -> dataSchema.json))
+ partitionedTestDF.write.format(dataSourceName)
+ .mode(SaveMode.Overwrite)
+ .option("dataSchema", dataSchema.json)
+ .saveAsTable("t")
withTable("t") {
checkQueries(table("t"))
@@ -492,11 +470,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
- load(
- source = dataSourceName,
- options = Map(
- "path" -> file.getCanonicalPath,
- "dataSchema" -> dataSchemaWithPartition.json)))
+ read.format(dataSourceName)
+ .option("dataSchema", dataSchemaWithPartition.json)
+ .load(file.getCanonicalPath))
}
}
}
@@ -518,18 +494,16 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
sparkContext
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
- .saveAsParquetFile(partitionDir.toString)
+ .write.parquet(partitionDir.toString)
}
val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
- load(
- source = dataSourceName,
- options = Map(
- "path" -> file.getCanonicalPath,
- "dataSchema" -> dataSchemaWithPartition.json)))
+ read.format(dataSourceName)
+ .option("dataSchema", dataSchemaWithPartition.json)
+ .load(file.getCanonicalPath))
}
}
}