aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java9
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala10
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java9
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java9
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala41
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala126
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala6
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java6
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala27
21 files changed, 189 insertions, 201 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index adb96dd8bf..82bb284ea3 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -25,8 +25,6 @@ import java.util.List;
import java.util.Properties;
// $example on:basic_parquet_example$
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
// $example on:schema_merging$
@@ -217,12 +215,11 @@ public class JavaSQLDataSourceExample {
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
- // an RDD[String] storing one JSON object per string.
+ // an Dataset[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
- JavaRDD<String> anotherPeopleRDD =
- new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
- Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
+ Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
+ Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index 66f7cb1b53..381e69cda8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -111,6 +111,10 @@ object SQLDataSourceExample {
private def runJsonDatasetExample(spark: SparkSession): Unit = {
// $example on:json_dataset$
+ // Primitive types (Int, String, etc) and Product types (case classes) encoders are
+ // supported by importing this when creating a Dataset.
+ import spark.implicits._
+
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
@@ -135,10 +139,10 @@ object SQLDataSourceExample {
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
- // an RDD[String] storing one JSON object per string
- val otherPeopleRDD = spark.sparkContext.makeRDD(
+ // an Dataset[String] storing one JSON object per string
+ val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
- val otherPeople = spark.read.json(otherPeopleRDD)
+ val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
index bf8ff61eae..eb4d76c6ab 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
@@ -31,6 +31,7 @@ import org.junit.Test;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
@@ -146,13 +147,13 @@ public class JavaApplySchemaSuite implements Serializable {
@Test
public void applySchemaToJSON() {
- JavaRDD<String> jsonRDD = jsc.parallelize(Arrays.asList(
+ Dataset<String> jsonDS = spark.createDataset(Arrays.asList(
"{\"string\":\"this is a simple string.\", \"integer\":10, \"long\":21474836470, " +
"\"bigInteger\":92233720368547758070, \"double\":1.7976931348623157E308, " +
"\"boolean\":true, \"null\":null}",
"{\"string\":\"this is another simple string.\", \"integer\":11, \"long\":21474836469, " +
"\"bigInteger\":92233720368547758069, \"double\":1.7976931348623157E305, " +
- "\"boolean\":false, \"null\":null}"));
+ "\"boolean\":false, \"null\":null}"), Encoders.STRING());
List<StructField> fields = new ArrayList<>(7);
fields.add(DataTypes.createStructField("bigInteger", DataTypes.createDecimalType(20, 0),
true));
@@ -183,14 +184,14 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is another simple string."));
- Dataset<Row> df1 = spark.read().json(jsonRDD);
+ Dataset<Row> df1 = spark.read().json(jsonDS);
StructType actualSchema1 = df1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
df1.createOrReplaceTempView("jsonTable1");
List<Row> actual1 = spark.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);
- Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonRDD);
+ Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonDS);
StructType actualSchema2 = df2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
df2.createOrReplaceTempView("jsonTable2");
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index a8f814bfae..be8d95d0d9 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -414,4 +414,13 @@ public class JavaDataFrameSuite {
Assert.assertEquals(df.schema().length(), 0);
Assert.assertEquals(df.collectAsList().size(), 1);
}
+
+ @Test
+ public void testJsonRDDToDataFrame() {
+ // This is a test for the deprecated API in SPARK-15615.
+ JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("{\"a\": 2}"));
+ Dataset<Row> df = spark.read().json(rdd);
+ Assert.assertEquals(1L, df.count());
+ Assert.assertEquals(2L, df.collectAsList().get(0).getLong(0));
+ }
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java
index 6941c86dfc..127d272579 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java
@@ -29,8 +29,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
@@ -40,7 +38,6 @@ import org.apache.spark.util.Utils;
public class JavaSaveLoadSuite {
private transient SparkSession spark;
- private transient JavaSparkContext jsc;
File path;
Dataset<Row> df;
@@ -58,7 +55,6 @@ public class JavaSaveLoadSuite {
.master("local[*]")
.appName("testing")
.getOrCreate();
- jsc = new JavaSparkContext(spark.sparkContext());
path =
Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
@@ -70,8 +66,8 @@ public class JavaSaveLoadSuite {
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
- JavaRDD<String> rdd = jsc.parallelize(jsonObjects);
- df = spark.read().json(rdd);
+ Dataset<String> ds = spark.createDataset(jsonObjects, Encoders.STRING());
+ df = spark.read().json(ds);
df.createOrReplaceTempView("jsonTable");
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 5e65436079..19c2d5532d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -914,15 +914,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-7551: support backticks for DataFrame attribute resolution") {
- val df = spark.read.json(sparkContext.makeRDD(
- """{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
+ val df = spark.read.json(Seq("""{"a.b": {"c": {"d..e": {"f": 1}}}}""").toDS())
checkAnswer(
df.select(df("`a.b`.c.`d..e`.`f`")),
Row(1)
)
- val df2 = spark.read.json(sparkContext.makeRDD(
- """{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil))
+ val df2 = spark.read.json(Seq("""{"a b": {"c": {"d e": {"f": 1}}}}""").toDS())
checkAnswer(
df2.select(df2("`a b`.c.d e.f")),
Row(1)
@@ -1110,8 +1108,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-9323: DataFrame.orderBy should support nested column name") {
- val df = spark.read.json(sparkContext.makeRDD(
- """{"a": {"b": 1}}""" :: Nil))
+ val df = spark.read.json(Seq("""{"a": {"b": 1}}""").toDS())
checkAnswer(df.orderBy("a.b"), Row(Row(1)))
}
@@ -1164,8 +1161,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-10316: respect non-deterministic expressions in PhysicalOperation") {
- val input = spark.read.json(spark.sparkContext.makeRDD(
- (1 to 10).map(i => s"""{"id": $i}""")))
+ val input = spark.read.json((1 to 10).map(i => s"""{"id": $i}""").toDS())
val df = input.select($"id", rand(0).as('r))
df.as("a").join(df.filter($"r" < 0.5).as("b"), $"a.id" === $"b.id").collect().foreach { row =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 03cdfccdda..468ea05512 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -211,8 +211,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("grouping on nested fields") {
- spark.read.json(sparkContext.parallelize(
- """{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
+ spark.read
+ .json(Seq("""{"nested": {"attribute": 1}, "value": 2}""").toDS())
.createOrReplaceTempView("rows")
checkAnswer(
@@ -229,9 +229,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-6201 IN type conversion") {
- spark.read.json(
- sparkContext.parallelize(
- Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
+ spark.read
+ .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}").toDS())
.createOrReplaceTempView("d")
checkAnswer(
@@ -240,9 +239,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-11226 Skip empty line in json file") {
- spark.read.json(
- sparkContext.parallelize(
- Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "")))
+ spark.read
+ .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
.createOrReplaceTempView("d")
checkAnswer(
@@ -1214,8 +1212,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-3483 Special chars in column names") {
- val data = sparkContext.parallelize(
- Seq("""{"key?number1": "value1", "key.number2": "value2"}"""))
+ val data = Seq("""{"key?number1": "value1", "key.number2": "value2"}""").toDS()
spark.read.json(data).createOrReplaceTempView("records")
sql("SELECT `key?number1`, `key.number2` FROM records")
}
@@ -1257,13 +1254,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-4322 Grouping field with struct field as sub expression") {
- spark.read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil))
+ spark.read.json(Seq("""{"a": {"b": [{"c": 1}]}}""").toDS())
.createOrReplaceTempView("data")
checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1))
spark.catalog.dropTempView("data")
- spark.read.json(
- sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).createOrReplaceTempView("data")
+ spark.read.json(Seq("""{"a": {"b": 1}}""").toDS())
+ .createOrReplaceTempView("data")
checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2))
spark.catalog.dropTempView("data")
}
@@ -1311,8 +1308,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-6145: ORDER BY test for nested fields") {
- spark.read.json(sparkContext.makeRDD(
- """{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
+ spark.read
+ .json(Seq("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""").toDS())
.createOrReplaceTempView("nestedOrder")
checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1))
@@ -1325,7 +1322,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-6145: special cases") {
spark.read
- .json(sparkContext.makeRDD("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""" :: Nil))
+ .json(Seq("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""").toDS())
.createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY _c0.a"), Row(1))
@@ -1333,8 +1330,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-6898: complete support for special chars in column names") {
- spark.read.json(sparkContext.makeRDD(
- """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
+ spark.read
+ .json(Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS())
.createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
@@ -1437,8 +1434,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-7067: order by queries for complex ExtractValue chain") {
withTempView("t") {
- spark.read.json(sparkContext.makeRDD(
- """{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).createOrReplaceTempView("t")
+ spark.read
+ .json(Seq("""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""").toDS())
+ .createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
}
}
@@ -2109,8 +2107,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}}}'
|
""".stripMargin
- val rdd = sparkContext.parallelize(Array(json))
- spark.read.json(rdd).write.mode("overwrite").parquet(dir.toString)
+ spark.read.json(Seq(json).toDS()).write.mode("overwrite").parquet(dir.toString)
spark.read.parquet(dir.toString).collect()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index c7a77daaca..b096a6db85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -221,8 +221,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT
StructField("vec", new UDT.MyDenseVectorUDT, false)
))
- val stringRDD = sparkContext.parallelize(data)
- val jsonRDD = spark.read.schema(schema).json(stringRDD)
+ val jsonRDD = spark.read.schema(schema).json(data.toDS())
checkAnswer(
jsonRDD,
Row(1, new UDT.MyDenseVector(Array(1.1, 2.2, 3.3, 4.4))) ::
@@ -242,8 +241,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT
StructField("vec", new UDT.MyDenseVectorUDT, false)
))
- val stringRDD = sparkContext.parallelize(data)
- val jsonDataset = spark.read.schema(schema).json(stringRDD)
+ val jsonDataset = spark.read.schema(schema).json(data.toDS())
.as[(Int, UDT.MyDenseVector)]
checkDataset(
jsonDataset,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
index d2704b3d3f..a42891e55a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
@@ -140,7 +140,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
datum += "}"
datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}"""
- val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+ val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1")
}
@@ -157,7 +157,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
datum = "{\"value\": " + datum + "}"
selector = selector + ".value"
}
- val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+ val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$depth deep x $numRows rows", selector)
}
@@ -180,7 +180,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
// TODO(ekl) seems like the json parsing is actually the majority of the time, perhaps
// we should benchmark that too separately.
- val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+ val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$numNodes x $depth deep x $numRows rows", selector)
}
@@ -200,7 +200,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
}
datum += "]}"
- val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+ val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$width wide x $numRows rows", "value[0]")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index 0b72da5f37..6e2b4f0df5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -25,19 +25,18 @@ import org.apache.spark.sql.test.SharedSQLContext
* Test cases for various [[JSONOptions]].
*/
class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
test("allowComments off") {
val str = """{'name': /* hello */ 'Reynold Xin'}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.json(rdd)
+ val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowComments on") {
val str = """{'name': /* hello */ 'Reynold Xin'}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.option("allowComments", "true").json(rdd)
+ val df = spark.read.option("allowComments", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "name")
assert(df.first().getString(0) == "Reynold Xin")
@@ -45,16 +44,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
test("allowSingleQuotes off") {
val str = """{'name': 'Reynold Xin'}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.option("allowSingleQuotes", "false").json(rdd)
+ val df = spark.read.option("allowSingleQuotes", "false").json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowSingleQuotes on") {
val str = """{'name': 'Reynold Xin'}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.json(rdd)
+ val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "name")
assert(df.first().getString(0) == "Reynold Xin")
@@ -62,16 +59,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
test("allowUnquotedFieldNames off") {
val str = """{name: 'Reynold Xin'}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.json(rdd)
+ val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowUnquotedFieldNames on") {
val str = """{name: 'Reynold Xin'}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.option("allowUnquotedFieldNames", "true").json(rdd)
+ val df = spark.read.option("allowUnquotedFieldNames", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "name")
assert(df.first().getString(0) == "Reynold Xin")
@@ -79,16 +74,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
test("allowNumericLeadingZeros off") {
val str = """{"age": 0018}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.json(rdd)
+ val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowNumericLeadingZeros on") {
val str = """{"age": 0018}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.option("allowNumericLeadingZeros", "true").json(rdd)
+ val df = spark.read.option("allowNumericLeadingZeros", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "age")
assert(df.first().getLong(0) == 18)
@@ -98,16 +91,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
// JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS.
ignore("allowNonNumericNumbers off") {
val str = """{"age": NaN}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.json(rdd)
+ val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
ignore("allowNonNumericNumbers on") {
val str = """{"age": NaN}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.option("allowNonNumericNumbers", "true").json(rdd)
+ val df = spark.read.option("allowNonNumericNumbers", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "age")
assert(df.first().getDouble(0).isNaN)
@@ -115,16 +106,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
test("allowBackslashEscapingAnyCharacter off") {
val str = """{"name": "Cazen Lee", "price": "\$10"}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(rdd)
+ val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowBackslashEscapingAnyCharacter on") {
val str = """{"name": "Cazen Lee", "price": "\$10"}"""
- val rdd = spark.sparkContext.parallelize(Seq(str))
- val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(rdd)
+ val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "name")
assert(df.schema.last.name == "price")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0e01be2410..0aaf148dac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -590,7 +590,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
- primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val jsonDF = spark.read.json(path)
val expectedSchema = StructType(
@@ -622,7 +622,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
- primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val jsonDF = spark.read.option("primitivesAsString", "true").json(path)
val expectedSchema = StructType(
@@ -777,9 +777,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("Find compatible types even if inferred DecimalType is not capable of other IntegralType") {
- val mixedIntegerAndDoubleRecords = sparkContext.parallelize(
- """{"a": 3, "b": 1.1}""" ::
- s"""{"a": 3.1, "b": 0.${"0" * 38}1}""" :: Nil)
+ val mixedIntegerAndDoubleRecords = Seq(
+ """{"a": 3, "b": 1.1}""",
+ s"""{"a": 3.1, "b": 0.${"0" * 38}1}""").toDS()
val jsonDF = spark.read
.option("prefersDecimal", "true")
.json(mixedIntegerAndDoubleRecords)
@@ -828,7 +828,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val mergedJsonDF = spark.read
.option("prefersDecimal", "true")
- .json(floatingValueRecords ++ bigIntegerRecords)
+ .json(floatingValueRecords.union(bigIntegerRecords))
val expectedMergedSchema = StructType(
StructField("a", DoubleType, true) ::
@@ -846,7 +846,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.toURI.toString
- primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
sql(
s"""
@@ -873,7 +873,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
- primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val schema = StructType(
StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) ::
@@ -1263,7 +1263,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
val jsonDF = spark.read.json(primitiveFieldAndType)
- val primTable = spark.read.json(jsonDF.toJSON.rdd)
+ val primTable = spark.read.json(jsonDF.toJSON)
primTable.createOrReplaceTempView("primitiveTable")
checkAnswer(
sql("select * from primitiveTable"),
@@ -1276,7 +1276,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
val complexJsonDF = spark.read.json(complexFieldAndType1)
- val compTable = spark.read.json(complexJsonDF.toJSON.rdd)
+ val compTable = spark.read.json(complexJsonDF.toJSON)
compTable.createOrReplaceTempView("complexTable")
// Access elements of a primitive array.
checkAnswer(
@@ -1364,10 +1364,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
})
}
- test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
+ test("SPARK-6245 JsonInferSchema.infer on empty RDD") {
// This is really a test that it doesn't throw an exception
val emptySchema = JsonInferSchema.infer(
- empty,
+ empty.rdd,
new JSONOptions(Map.empty[String, String], "GMT"),
CreateJacksonParser.string)
assert(StructType(Seq()) === emptySchema)
@@ -1394,7 +1394,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-8093 Erase empty structs") {
val emptySchema = JsonInferSchema.infer(
- emptyRecords,
+ emptyRecords.rdd,
new JSONOptions(Map.empty[String, String], "GMT"),
CreateJacksonParser.string)
assert(StructType(Seq()) === emptySchema)
@@ -1592,7 +1592,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
- arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).write.text(path)
val schema =
StructType(
@@ -1609,7 +1609,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
- primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val jsonDF = spark.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
@@ -1645,7 +1645,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
dir.delete()
val path = dir.getCanonicalPath
- primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val jsonDF = spark.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
@@ -1693,8 +1693,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val json = s"""
|{"a": [{$nested}], "b": [{$nested}]}
""".stripMargin
- val rdd = spark.sparkContext.makeRDD(Seq(json))
- val df = spark.read.json(rdd)
+ val df = spark.read.json(Seq(json).toDS())
assert(df.schema.size === 2)
df.collect()
}
@@ -1794,8 +1793,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
- val records = sparkContext
- .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil)
+ val records = Seq("""{"a": 3, "b": 1.1}""", """{"a": 3.1, "b": 0.000001}""").toDS()
val schema = StructType(
StructField("a", DecimalType(21, 1), true) ::
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index a400940db9..13084ba4a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -17,14 +17,13 @@
package org.apache.spark.sql.execution.datasources.json
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
private[json] trait TestJsonData {
protected def spark: SparkSession
- def primitiveFieldAndType: RDD[String] =
- spark.sparkContext.parallelize(
+ def primitiveFieldAndType: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"string":"this is a simple string.",
"integer":10,
"long":21474836470,
@@ -32,10 +31,10 @@ private[json] trait TestJsonData {
"double":1.7976931348623157E308,
"boolean":true,
"null":null
- }""" :: Nil)
+ }""" :: Nil))(Encoders.STRING)
- def primitiveFieldValueTypeConflict: RDD[String] =
- spark.sparkContext.parallelize(
+ def primitiveFieldValueTypeConflict: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
"num_bool":true, "num_str":13.1, "str_bool":"str1"}""" ::
"""{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null,
@@ -44,16 +43,17 @@ private[json] trait TestJsonData {
"num_bool":false, "num_str":"str1", "str_bool":false}""" ::
"""{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470,
"num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil)
+ )(Encoders.STRING)
- def jsonNullStruct: RDD[String] =
- spark.sparkContext.parallelize(
+ def jsonNullStruct: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}""" ::
"""{"nullstr":"","ip":"27.31.100.29","headers":{}}""" ::
"""{"nullstr":"","ip":"27.31.100.29","headers":""}""" ::
- """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil)
+ """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil))(Encoders.STRING)
- def complexFieldValueTypeConflict: RDD[String] =
- spark.sparkContext.parallelize(
+ def complexFieldValueTypeConflict: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"num_struct":11, "str_array":[1, 2, 3],
"array":[], "struct_array":[], "struct": {}}""" ::
"""{"num_struct":{"field":false}, "str_array":null,
@@ -62,24 +62,25 @@ private[json] trait TestJsonData {
"array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": {"field":null}}""" ::
"""{"num_struct":{}, "str_array":["str1", "str2", 33],
"array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil)
+ )(Encoders.STRING)
- def arrayElementTypeConflict: RDD[String] =
- spark.sparkContext.parallelize(
+ def arrayElementTypeConflict: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}],
"array2": [{"field":214748364700}, {"field":1}]}""" ::
"""{"array3": [{"field":"str"}, {"field":1}]}""" ::
- """{"array3": [1, 2, 3]}""" :: Nil)
+ """{"array3": [1, 2, 3]}""" :: Nil))(Encoders.STRING)
- def missingFields: RDD[String] =
- spark.sparkContext.parallelize(
+ def missingFields: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"a":true}""" ::
"""{"b":21474836470}""" ::
"""{"c":[33, 44]}""" ::
"""{"d":{"field":true}}""" ::
- """{"e":"str"}""" :: Nil)
+ """{"e":"str"}""" :: Nil))(Encoders.STRING)
- def complexFieldAndType1: RDD[String] =
- spark.sparkContext.parallelize(
+ def complexFieldAndType1: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"struct":{"field1": true, "field2": 92233720368547758070},
"structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
"arrayOfString":["str1", "str2"],
@@ -92,10 +93,10 @@ private[json] trait TestJsonData {
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
"arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
- }""" :: Nil)
+ }""" :: Nil))(Encoders.STRING)
- def complexFieldAndType2: RDD[String] =
- spark.sparkContext.parallelize(
+ def complexFieldAndType2: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"complexArrayOfStruct": [
{
@@ -146,89 +147,90 @@ private[json] trait TestJsonData {
{"inner3": [[{"inner4": 2}]]}
]
]]
- }""" :: Nil)
+ }""" :: Nil))(Encoders.STRING)
- def mapType1: RDD[String] =
- spark.sparkContext.parallelize(
+ def mapType1: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"map": {"a": 1}}""" ::
"""{"map": {"b": 2}}""" ::
"""{"map": {"c": 3}}""" ::
"""{"map": {"c": 1, "d": 4}}""" ::
- """{"map": {"e": null}}""" :: Nil)
+ """{"map": {"e": null}}""" :: Nil))(Encoders.STRING)
- def mapType2: RDD[String] =
- spark.sparkContext.parallelize(
+ def mapType2: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"map": {"a": {"field1": [1, 2, 3, null]}}}""" ::
"""{"map": {"b": {"field2": 2}}}""" ::
"""{"map": {"c": {"field1": [], "field2": 4}}}""" ::
"""{"map": {"c": {"field2": 3}, "d": {"field1": [null]}}}""" ::
"""{"map": {"e": null}}""" ::
- """{"map": {"f": {"field1": null}}}""" :: Nil)
+ """{"map": {"f": {"field1": null}}}""" :: Nil))(Encoders.STRING)
- def nullsInArrays: RDD[String] =
- spark.sparkContext.parallelize(
+ def nullsInArrays: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"field1":[[null], [[["Test"]]]]}""" ::
"""{"field2":[null, [{"Test":1}]]}""" ::
"""{"field3":[[null], [{"Test":"2"}]]}""" ::
- """{"field4":[[null, [1,2,3]]]}""" :: Nil)
+ """{"field4":[[null, [1,2,3]]]}""" :: Nil))(Encoders.STRING)
- def jsonArray: RDD[String] =
- spark.sparkContext.parallelize(
+ def jsonArray: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""[{"a":"str_a_1"}]""" ::
"""[{"a":"str_a_2"}, {"b":"str_b_3"}]""" ::
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
- """[]""" :: Nil)
+ """[]""" :: Nil))(Encoders.STRING)
- def corruptRecords: RDD[String] =
- spark.sparkContext.parallelize(
+ def corruptRecords: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{""" ::
"""""" ::
"""{"a":1, b:2}""" ::
"""{"a":{, b:3}""" ::
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
- """]""" :: Nil)
+ """]""" :: Nil))(Encoders.STRING)
- def additionalCorruptRecords: RDD[String] =
- spark.sparkContext.parallelize(
+ def additionalCorruptRecords: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"dummy":"test"}""" ::
"""[1,2,3]""" ::
"""":"test", "a":1}""" ::
"""42""" ::
- """ ","ian":"test"}""" :: Nil)
+ """ ","ian":"test"}""" :: Nil))(Encoders.STRING)
- def emptyRecords: RDD[String] =
- spark.sparkContext.parallelize(
+ def emptyRecords: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{""" ::
"""""" ::
"""{"a": {}}""" ::
"""{"a": {"b": {}}}""" ::
"""{"b": [{"c": {}}]}""" ::
- """]""" :: Nil)
+ """]""" :: Nil))(Encoders.STRING)
- def timestampAsLong: RDD[String] =
- spark.sparkContext.parallelize(
- """{"ts":1451732645}""" :: Nil)
+ def timestampAsLong: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
+ """{"ts":1451732645}""" :: Nil))(Encoders.STRING)
- def arrayAndStructRecords: RDD[String] =
- spark.sparkContext.parallelize(
+ def arrayAndStructRecords: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"a": {"b": 1}}""" ::
- """{"a": []}""" :: Nil)
+ """{"a": []}""" :: Nil))(Encoders.STRING)
- def floatingValueRecords: RDD[String] =
- spark.sparkContext.parallelize(
- s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil)
+ def floatingValueRecords: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
+ s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil))(Encoders.STRING)
- def bigIntegerRecords: RDD[String] =
- spark.sparkContext.parallelize(
- s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
+ def bigIntegerRecords: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
+ s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil))(Encoders.STRING)
- def datesRecords: RDD[String] =
- spark.sparkContext.parallelize(
+ def datesRecords: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize(
"""{"date": "26/08/2015 18:00"}""" ::
"""{"date": "27/10/2014 18:30"}""" ::
- """{"date": "28/01/2016 20:00"}""" :: Nil)
+ """{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING)
- lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)
+ lazy val singleRow: Dataset[String] =
+ spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING)
- def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())
+ def empty: Dataset[String] = spark.emptyDataset(Encoders.STRING)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 4a42f8ea79..916a01ee0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -33,14 +33,15 @@ class CreateTableAsSelectSuite
extends DataSourceTest
with SharedSQLContext
with BeforeAndAfterEach {
+ import testImplicits._
protected override lazy val sql = spark.sql _
private var path: File = null
override def beforeAll(): Unit = {
super.beforeAll()
- val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- spark.read.json(rdd).createOrReplaceTempView("jt")
+ val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""").toDS()
+ spark.read.json(ds).createOrReplaceTempView("jt")
}
override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 4fc2f81f54..19835cd184 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -24,14 +24,16 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
class InsertSuite extends DataSourceTest with SharedSQLContext {
+ import testImplicits._
+
protected override lazy val sql = spark.sql _
private var path: File = null
override def beforeAll(): Unit = {
super.beforeAll()
path = Utils.createTempDir()
- val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
- spark.read.json(rdd).createOrReplaceTempView("jt")
+ val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
+ spark.read.json(ds).createOrReplaceTempView("jt")
sql(
s"""
|CREATE TEMPORARY VIEW jsonTable (a int, b string)
@@ -129,7 +131,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
// Writing the table to less part files.
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5)
- spark.read.json(rdd1).createOrReplaceTempView("jt1")
+ spark.read.json(rdd1.toDS()).createOrReplaceTempView("jt1")
sql(
s"""
|INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1
@@ -141,7 +143,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
// Writing the table to more part files.
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10)
- spark.read.json(rdd2).createOrReplaceTempView("jt2")
+ spark.read.json(rdd1.toDS()).createOrReplaceTempView("jt2")
sql(
s"""
|INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index b1756c27fa..773d34dfaf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -28,6 +28,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter {
+ import testImplicits._
+
protected override lazy val sql = spark.sql _
private var originalDefaultSource: String = null
private var path: File = null
@@ -40,8 +42,8 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
path = Utils.createTempDir()
path.delete()
- val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
- df = spark.read.json(rdd)
+ val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""").toDS()
+ df = spark.read.json(ds)
df.createOrReplaceTempView("jsonTable")
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
index f664d5a4cd..aefc9cc77d 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
@@ -26,7 +26,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
@@ -35,7 +34,6 @@ import org.apache.spark.sql.hive.test.TestHive$;
import org.apache.spark.sql.hive.aggregate.MyDoubleSum;
public class JavaDataFrameSuite {
- private transient JavaSparkContext sc;
private transient SQLContext hc;
Dataset<Row> df;
@@ -50,13 +48,11 @@ public class JavaDataFrameSuite {
@Before
public void setUp() throws IOException {
hc = TestHive$.MODULE$;
- sc = new JavaSparkContext(hc.sparkContext());
-
List<String> jsonObjects = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}");
}
- df = hc.read().json(sc.parallelize(jsonObjects));
+ df = hc.read().json(hc.createDataset(jsonObjects, Encoders.STRING()));
df.createOrReplaceTempView("window_table");
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index 061c7431a6..0b157a45e6 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -31,9 +31,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.QueryTest$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@@ -81,8 +81,8 @@ public class JavaMetastoreDataSourcesSuite {
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
- JavaRDD<String> rdd = sc.parallelize(jsonObjects);
- df = sqlContext.read().json(rdd);
+ Dataset<String> ds = sqlContext.createDataset(jsonObjects, Encoders.STRING());
+ df = sqlContext.read().json(ds);
df.createOrReplaceTempView("jsonTable");
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e951bbe1dc..03ea0c8c77 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -511,9 +511,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
test("create external table") {
withTempPath { tempPath =>
withTable("savedJsonTable", "createdJsonTable") {
- val df = read.json(sparkContext.parallelize((1 to 10).map { i =>
+ val df = read.json((1 to 10).map { i =>
s"""{ "a": $i, "b": "str$i" }"""
- }))
+ }.toDS())
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") {
df.write
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index cfca1d7983..8a37bc3665 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils
* A set of tests that validates support for Hive Explain command.
*/
class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+ import testImplicits._
test("show cost in explain command") {
// Only has sizeInBytes before ANALYZE command
@@ -92,8 +93,8 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") {
withTempView("jt") {
- val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
- spark.read.json(rdd).createOrReplaceTempView("jt")
+ val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
+ spark.read.json(ds).createOrReplaceTempView("jt")
val outputs = sql(
s"""
|EXPLAIN EXTENDED
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index b2f19d7753..ce92fbf349 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -31,15 +31,15 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])
class HiveResolutionSuite extends HiveComparisonTest {
test("SPARK-3698: case insensitive test for nested data") {
- read.json(sparkContext.makeRDD(
- """{"a": [{"a": {"a": 1}}]}""" :: Nil)).createOrReplaceTempView("nested")
+ read.json(Seq("""{"a": [{"a": {"a": 1}}]}""").toDS())
+ .createOrReplaceTempView("nested")
// This should be successfully analyzed
sql("SELECT a[0].A.A from nested").queryExecution.analyzed
}
test("SPARK-5278: check ambiguous reference to fields") {
- read.json(sparkContext.makeRDD(
- """{"a": [{"b": 1, "B": 2}]}""" :: Nil)).createOrReplaceTempView("nested")
+ read.json(Seq("""{"a": [{"b": 1, "B": 2}]}""").toDS())
+ .createOrReplaceTempView("nested")
// there are 2 filed matching field name "b", we should report Ambiguous reference error
val exception = intercept[AnalysisException] {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index faed8b5046..9f6176339e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -973,30 +973,30 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("SPARK-4296 Grouping field with Hive UDF as sub expression") {
- val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil)
- read.json(rdd).createOrReplaceTempView("data")
+ val ds = Seq("""{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""").toDS()
+ read.json(ds).createOrReplaceTempView("data")
checkAnswer(
sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"),
Row("str-1", 1970))
dropTempTable("data")
- read.json(rdd).createOrReplaceTempView("data")
+ read.json(ds).createOrReplaceTempView("data")
checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971))
dropTempTable("data")
}
test("resolve udtf in projection #1") {
- val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
- read.json(rdd).createOrReplaceTempView("data")
+ val ds = (1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
+ read.json(ds).createOrReplaceTempView("data")
val df = sql("SELECT explode(a) AS val FROM data")
val col = df("val")
}
test("resolve udtf in projection #2") {
- val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}"""))
- read.json(rdd).createOrReplaceTempView("data")
+ val ds = (1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
+ read.json(ds).createOrReplaceTempView("data")
checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil)
checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil)
intercept[AnalysisException] {
@@ -1010,8 +1010,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// TGF with non-TGF in project is allowed in Spark SQL, but not in Hive
test("TGF with non-TGF in projection") {
- val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil)
- read.json(rdd).createOrReplaceTempView("data")
+ val ds = Seq("""{"a": "1", "b":"1"}""").toDS()
+ read.json(ds).createOrReplaceTempView("data")
checkAnswer(
sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"),
Row("1", "1", "1", "1") :: Nil)
@@ -1024,8 +1024,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// is not in a valid state (cannot be executed). Because of this bug, the analysis rule of
// PreInsertionCasts will actually start to work before ImplicitGenerate and then
// generates an invalid query plan.
- val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
- read.json(rdd).createOrReplaceTempView("data")
+ val ds = (1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
+ read.json(ds).createOrReplaceTempView("data")
withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
sql("CREATE TABLE explodeTest (key bigInt)")
@@ -1262,9 +1262,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("SPARK-9371: fix the support for special chars in column names for hive context") {
- read.json(sparkContext.makeRDD(
- """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
- .createOrReplaceTempView("t")
+ val ds = Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS()
+ read.json(ds).createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
}