diff options
21 files changed, 189 insertions, 201 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index adb96dd8bf..82bb284ea3 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Properties; // $example on:basic_parquet_example$ -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; // $example on:schema_merging$ @@ -217,12 +215,11 @@ public class JavaSQLDataSourceExample { // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by - // an RDD[String] storing one JSON object per string. + // an Dataset[String] storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); - JavaRDD<String> anotherPeopleRDD = - new JavaSparkContext(spark.sparkContext()).parallelize(jsonData); - Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD); + Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING()); + Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset); anotherPeople.show(); // +---------------+----+ // | address|name| diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index 66f7cb1b53..381e69cda8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -111,6 +111,10 @@ object SQLDataSourceExample { private def runJsonDatasetExample(spark: SparkSession): Unit = { // $example on:json_dataset$ + // Primitive types (Int, String, etc) and Product types (case classes) encoders are + // supported by importing this when creating a Dataset. + import spark.implicits._ + // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" @@ -135,10 +139,10 @@ object SQLDataSourceExample { // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by - // an RDD[String] storing one JSON object per string - val otherPeopleRDD = spark.sparkContext.makeRDD( + // an Dataset[String] storing one JSON object per string + val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) - val otherPeople = spark.read.json(otherPeopleRDD) + val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java index bf8ff61eae..eb4d76c6ab 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java @@ -31,6 +31,7 @@ import org.junit.Test; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; @@ -146,13 +147,13 @@ public class JavaApplySchemaSuite implements Serializable { @Test public void applySchemaToJSON() { - JavaRDD<String> jsonRDD = jsc.parallelize(Arrays.asList( + Dataset<String> jsonDS = spark.createDataset(Arrays.asList( "{\"string\":\"this is a simple string.\", \"integer\":10, \"long\":21474836470, " + "\"bigInteger\":92233720368547758070, \"double\":1.7976931348623157E308, " + "\"boolean\":true, \"null\":null}", "{\"string\":\"this is another simple string.\", \"integer\":11, \"long\":21474836469, " + "\"bigInteger\":92233720368547758069, \"double\":1.7976931348623157E305, " + - "\"boolean\":false, \"null\":null}")); + "\"boolean\":false, \"null\":null}"), Encoders.STRING()); List<StructField> fields = new ArrayList<>(7); fields.add(DataTypes.createStructField("bigInteger", DataTypes.createDecimalType(20, 0), true)); @@ -183,14 +184,14 @@ public class JavaApplySchemaSuite implements Serializable { null, "this is another simple string.")); - Dataset<Row> df1 = spark.read().json(jsonRDD); + Dataset<Row> df1 = spark.read().json(jsonDS); StructType actualSchema1 = df1.schema(); Assert.assertEquals(expectedSchema, actualSchema1); df1.createOrReplaceTempView("jsonTable1"); List<Row> actual1 = spark.sql("select * from jsonTable1").collectAsList(); Assert.assertEquals(expectedResult, actual1); - Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonRDD); + Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonDS); StructType actualSchema2 = df2.schema(); Assert.assertEquals(expectedSchema, actualSchema2); df2.createOrReplaceTempView("jsonTable2"); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index a8f814bfae..be8d95d0d9 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -414,4 +414,13 @@ public class JavaDataFrameSuite { Assert.assertEquals(df.schema().length(), 0); Assert.assertEquals(df.collectAsList().size(), 1); } + + @Test + public void testJsonRDDToDataFrame() { + // This is a test for the deprecated API in SPARK-15615. + JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("{\"a\": 2}")); + Dataset<Row> df = spark.read().json(rdd); + Assert.assertEquals(1L, df.count()); + Assert.assertEquals(2L, df.collectAsList().get(0).getLong(0)); + } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java index 6941c86dfc..127d272579 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java @@ -29,8 +29,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; @@ -40,7 +38,6 @@ import org.apache.spark.util.Utils; public class JavaSaveLoadSuite { private transient SparkSession spark; - private transient JavaSparkContext jsc; File path; Dataset<Row> df; @@ -58,7 +55,6 @@ public class JavaSaveLoadSuite { .master("local[*]") .appName("testing") .getOrCreate(); - jsc = new JavaSparkContext(spark.sparkContext()); path = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); @@ -70,8 +66,8 @@ public class JavaSaveLoadSuite { for (int i = 0; i < 10; i++) { jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); } - JavaRDD<String> rdd = jsc.parallelize(jsonObjects); - df = spark.read().json(rdd); + Dataset<String> ds = spark.createDataset(jsonObjects, Encoders.STRING()); + df = spark.read().json(ds); df.createOrReplaceTempView("jsonTable"); } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5e65436079..19c2d5532d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -914,15 +914,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-7551: support backticks for DataFrame attribute resolution") { - val df = spark.read.json(sparkContext.makeRDD( - """{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil)) + val df = spark.read.json(Seq("""{"a.b": {"c": {"d..e": {"f": 1}}}}""").toDS()) checkAnswer( df.select(df("`a.b`.c.`d..e`.`f`")), Row(1) ) - val df2 = spark.read.json(sparkContext.makeRDD( - """{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil)) + val df2 = spark.read.json(Seq("""{"a b": {"c": {"d e": {"f": 1}}}}""").toDS()) checkAnswer( df2.select(df2("`a b`.c.d e.f")), Row(1) @@ -1110,8 +1108,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-9323: DataFrame.orderBy should support nested column name") { - val df = spark.read.json(sparkContext.makeRDD( - """{"a": {"b": 1}}""" :: Nil)) + val df = spark.read.json(Seq("""{"a": {"b": 1}}""").toDS()) checkAnswer(df.orderBy("a.b"), Row(Row(1))) } @@ -1164,8 +1161,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-10316: respect non-deterministic expressions in PhysicalOperation") { - val input = spark.read.json(spark.sparkContext.makeRDD( - (1 to 10).map(i => s"""{"id": $i}"""))) + val input = spark.read.json((1 to 10).map(i => s"""{"id": $i}""").toDS()) val df = input.select($"id", rand(0).as('r)) df.as("a").join(df.filter($"r" < 0.5).as("b"), $"a.id" === $"b.id").collect().foreach { row => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 03cdfccdda..468ea05512 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -211,8 +211,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("grouping on nested fields") { - spark.read.json(sparkContext.parallelize( - """{"nested": {"attribute": 1}, "value": 2}""" :: Nil)) + spark.read + .json(Seq("""{"nested": {"attribute": 1}, "value": 2}""").toDS()) .createOrReplaceTempView("rows") checkAnswer( @@ -229,9 +229,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-6201 IN type conversion") { - spark.read.json( - sparkContext.parallelize( - Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}"))) + spark.read + .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}").toDS()) .createOrReplaceTempView("d") checkAnswer( @@ -240,9 +239,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-11226 Skip empty line in json file") { - spark.read.json( - sparkContext.parallelize( - Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", ""))) + spark.read + .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS()) .createOrReplaceTempView("d") checkAnswer( @@ -1214,8 +1212,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-3483 Special chars in column names") { - val data = sparkContext.parallelize( - Seq("""{"key?number1": "value1", "key.number2": "value2"}""")) + val data = Seq("""{"key?number1": "value1", "key.number2": "value2"}""").toDS() spark.read.json(data).createOrReplaceTempView("records") sql("SELECT `key?number1`, `key.number2` FROM records") } @@ -1257,13 +1254,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-4322 Grouping field with struct field as sub expression") { - spark.read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil)) + spark.read.json(Seq("""{"a": {"b": [{"c": 1}]}}""").toDS()) .createOrReplaceTempView("data") checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1)) spark.catalog.dropTempView("data") - spark.read.json( - sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).createOrReplaceTempView("data") + spark.read.json(Seq("""{"a": {"b": 1}}""").toDS()) + .createOrReplaceTempView("data") checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2)) spark.catalog.dropTempView("data") } @@ -1311,8 +1308,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-6145: ORDER BY test for nested fields") { - spark.read.json(sparkContext.makeRDD( - """{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil)) + spark.read + .json(Seq("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""").toDS()) .createOrReplaceTempView("nestedOrder") checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1)) @@ -1325,7 +1322,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-6145: special cases") { spark.read - .json(sparkContext.makeRDD("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""" :: Nil)) + .json(Seq("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""").toDS()) .createOrReplaceTempView("t") checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY _c0.a"), Row(1)) @@ -1333,8 +1330,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-6898: complete support for special chars in column names") { - spark.read.json(sparkContext.makeRDD( - """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil)) + spark.read + .json(Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS()) .createOrReplaceTempView("t") checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1)) @@ -1437,8 +1434,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-7067: order by queries for complex ExtractValue chain") { withTempView("t") { - spark.read.json(sparkContext.makeRDD( - """{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).createOrReplaceTempView("t") + spark.read + .json(Seq("""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""").toDS()) + .createOrReplaceTempView("t") checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1)))) } } @@ -2109,8 +2107,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { |"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}}}' | """.stripMargin - val rdd = sparkContext.parallelize(Array(json)) - spark.read.json(rdd).write.mode("overwrite").parquet(dir.toString) + spark.read.json(Seq(json).toDS()).write.mode("overwrite").parquet(dir.toString) spark.read.parquet(dir.toString).collect() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index c7a77daaca..b096a6db85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -221,8 +221,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT StructField("vec", new UDT.MyDenseVectorUDT, false) )) - val stringRDD = sparkContext.parallelize(data) - val jsonRDD = spark.read.schema(schema).json(stringRDD) + val jsonRDD = spark.read.schema(schema).json(data.toDS()) checkAnswer( jsonRDD, Row(1, new UDT.MyDenseVector(Array(1.1, 2.2, 3.3, 4.4))) :: @@ -242,8 +241,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT StructField("vec", new UDT.MyDenseVectorUDT, false) )) - val stringRDD = sparkContext.parallelize(data) - val jsonDataset = spark.read.schema(schema).json(stringRDD) + val jsonDataset = spark.read.schema(schema).json(data.toDS()) .as[(Int, UDT.MyDenseVector)] checkDataset( jsonDataset, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala index d2704b3d3f..a42891e55a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala @@ -140,7 +140,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach { } datum += "}" datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}""" - val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache() + val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache() df.count() // force caching addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1") } @@ -157,7 +157,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach { datum = "{\"value\": " + datum + "}" selector = selector + ".value" } - val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache() + val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache() df.count() // force caching addCases(benchmark, df, s"$depth deep x $numRows rows", selector) } @@ -180,7 +180,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach { } // TODO(ekl) seems like the json parsing is actually the majority of the time, perhaps // we should benchmark that too separately. - val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache() + val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache() df.count() // force caching addCases(benchmark, df, s"$numNodes x $depth deep x $numRows rows", selector) } @@ -200,7 +200,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach { } } datum += "]}" - val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache() + val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache() df.count() // force caching addCases(benchmark, df, s"$width wide x $numRows rows", "value[0]") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala index 0b72da5f37..6e2b4f0df5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala @@ -25,19 +25,18 @@ import org.apache.spark.sql.test.SharedSQLContext * Test cases for various [[JSONOptions]]. */ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ test("allowComments off") { val str = """{'name': /* hello */ 'Reynold Xin'}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.json(rdd) + val df = spark.read.json(Seq(str).toDS()) assert(df.schema.head.name == "_corrupt_record") } test("allowComments on") { val str = """{'name': /* hello */ 'Reynold Xin'}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.option("allowComments", "true").json(rdd) + val df = spark.read.option("allowComments", "true").json(Seq(str).toDS()) assert(df.schema.head.name == "name") assert(df.first().getString(0) == "Reynold Xin") @@ -45,16 +44,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowSingleQuotes off") { val str = """{'name': 'Reynold Xin'}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.option("allowSingleQuotes", "false").json(rdd) + val df = spark.read.option("allowSingleQuotes", "false").json(Seq(str).toDS()) assert(df.schema.head.name == "_corrupt_record") } test("allowSingleQuotes on") { val str = """{'name': 'Reynold Xin'}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.json(rdd) + val df = spark.read.json(Seq(str).toDS()) assert(df.schema.head.name == "name") assert(df.first().getString(0) == "Reynold Xin") @@ -62,16 +59,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowUnquotedFieldNames off") { val str = """{name: 'Reynold Xin'}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.json(rdd) + val df = spark.read.json(Seq(str).toDS()) assert(df.schema.head.name == "_corrupt_record") } test("allowUnquotedFieldNames on") { val str = """{name: 'Reynold Xin'}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.option("allowUnquotedFieldNames", "true").json(rdd) + val df = spark.read.option("allowUnquotedFieldNames", "true").json(Seq(str).toDS()) assert(df.schema.head.name == "name") assert(df.first().getString(0) == "Reynold Xin") @@ -79,16 +74,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowNumericLeadingZeros off") { val str = """{"age": 0018}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.json(rdd) + val df = spark.read.json(Seq(str).toDS()) assert(df.schema.head.name == "_corrupt_record") } test("allowNumericLeadingZeros on") { val str = """{"age": 0018}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.option("allowNumericLeadingZeros", "true").json(rdd) + val df = spark.read.option("allowNumericLeadingZeros", "true").json(Seq(str).toDS()) assert(df.schema.head.name == "age") assert(df.first().getLong(0) == 18) @@ -98,16 +91,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { // JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS. ignore("allowNonNumericNumbers off") { val str = """{"age": NaN}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.json(rdd) + val df = spark.read.json(Seq(str).toDS()) assert(df.schema.head.name == "_corrupt_record") } ignore("allowNonNumericNumbers on") { val str = """{"age": NaN}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.option("allowNonNumericNumbers", "true").json(rdd) + val df = spark.read.option("allowNonNumericNumbers", "true").json(Seq(str).toDS()) assert(df.schema.head.name == "age") assert(df.first().getDouble(0).isNaN) @@ -115,16 +106,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowBackslashEscapingAnyCharacter off") { val str = """{"name": "Cazen Lee", "price": "\$10"}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(rdd) + val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(Seq(str).toDS()) assert(df.schema.head.name == "_corrupt_record") } test("allowBackslashEscapingAnyCharacter on") { val str = """{"name": "Cazen Lee", "price": "\$10"}""" - val rdd = spark.sparkContext.parallelize(Seq(str)) - val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(rdd) + val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(Seq(str).toDS()) assert(df.schema.head.name == "name") assert(df.schema.last.name == "price") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0e01be2410..0aaf148dac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -590,7 +590,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dir = Utils.createTempDir() dir.delete() val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) val jsonDF = spark.read.json(path) val expectedSchema = StructType( @@ -622,7 +622,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dir = Utils.createTempDir() dir.delete() val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) val jsonDF = spark.read.option("primitivesAsString", "true").json(path) val expectedSchema = StructType( @@ -777,9 +777,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Find compatible types even if inferred DecimalType is not capable of other IntegralType") { - val mixedIntegerAndDoubleRecords = sparkContext.parallelize( - """{"a": 3, "b": 1.1}""" :: - s"""{"a": 3.1, "b": 0.${"0" * 38}1}""" :: Nil) + val mixedIntegerAndDoubleRecords = Seq( + """{"a": 3, "b": 1.1}""", + s"""{"a": 3.1, "b": 0.${"0" * 38}1}""").toDS() val jsonDF = spark.read .option("prefersDecimal", "true") .json(mixedIntegerAndDoubleRecords) @@ -828,7 +828,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val mergedJsonDF = spark.read .option("prefersDecimal", "true") - .json(floatingValueRecords ++ bigIntegerRecords) + .json(floatingValueRecords.union(bigIntegerRecords)) val expectedMergedSchema = StructType( StructField("a", DoubleType, true) :: @@ -846,7 +846,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dir = Utils.createTempDir() dir.delete() val path = dir.toURI.toString - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) sql( s""" @@ -873,7 +873,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dir = Utils.createTempDir() dir.delete() val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) val schema = StructType( StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) :: @@ -1263,7 +1263,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}") val jsonDF = spark.read.json(primitiveFieldAndType) - val primTable = spark.read.json(jsonDF.toJSON.rdd) + val primTable = spark.read.json(jsonDF.toJSON) primTable.createOrReplaceTempView("primitiveTable") checkAnswer( sql("select * from primitiveTable"), @@ -1276,7 +1276,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) val complexJsonDF = spark.read.json(complexFieldAndType1) - val compTable = spark.read.json(complexJsonDF.toJSON.rdd) + val compTable = spark.read.json(complexJsonDF.toJSON) compTable.createOrReplaceTempView("complexTable") // Access elements of a primitive array. checkAnswer( @@ -1364,10 +1364,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { }) } - test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { + test("SPARK-6245 JsonInferSchema.infer on empty RDD") { // This is really a test that it doesn't throw an exception val emptySchema = JsonInferSchema.infer( - empty, + empty.rdd, new JSONOptions(Map.empty[String, String], "GMT"), CreateJacksonParser.string) assert(StructType(Seq()) === emptySchema) @@ -1394,7 +1394,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-8093 Erase empty structs") { val emptySchema = JsonInferSchema.infer( - emptyRecords, + emptyRecords.rdd, new JSONOptions(Map.empty[String, String], "GMT"), CreateJacksonParser.string) assert(StructType(Seq()) === emptySchema) @@ -1592,7 +1592,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dir = Utils.createTempDir() dir.delete() val path = dir.getCanonicalPath - arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).write.text(path) val schema = StructType( @@ -1609,7 +1609,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dir = Utils.createTempDir() dir.delete() val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) val jsonDF = spark.read.json(path) val jsonDir = new File(dir, "json").getCanonicalPath @@ -1645,7 +1645,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { dir.delete() val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) val jsonDF = spark.read.json(path) val jsonDir = new File(dir, "json").getCanonicalPath @@ -1693,8 +1693,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val json = s""" |{"a": [{$nested}], "b": [{$nested}]} """.stripMargin - val rdd = spark.sparkContext.makeRDD(Seq(json)) - val df = spark.read.json(rdd) + val df = spark.read.json(Seq(json).toDS()) assert(df.schema.size === 2) df.collect() } @@ -1794,8 +1793,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - val records = sparkContext - .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil) + val records = Seq("""{"a": 3, "b": 1.1}""", """{"a": 3.1, "b": 0.000001}""").toDS() val schema = StructType( StructField("a", DecimalType(21, 1), true) :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index a400940db9..13084ba4a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.execution.datasources.json -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} private[json] trait TestJsonData { protected def spark: SparkSession - def primitiveFieldAndType: RDD[String] = - spark.sparkContext.parallelize( + def primitiveFieldAndType: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"string":"this is a simple string.", "integer":10, "long":21474836470, @@ -32,10 +31,10 @@ private[json] trait TestJsonData { "double":1.7976931348623157E308, "boolean":true, "null":null - }""" :: Nil) + }""" :: Nil))(Encoders.STRING) - def primitiveFieldValueTypeConflict: RDD[String] = - spark.sparkContext.parallelize( + def primitiveFieldValueTypeConflict: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1, "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" :: """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null, @@ -44,16 +43,17 @@ private[json] trait TestJsonData { "num_bool":false, "num_str":"str1", "str_bool":false}""" :: """{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470, "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil) + )(Encoders.STRING) - def jsonNullStruct: RDD[String] = - spark.sparkContext.parallelize( + def jsonNullStruct: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}""" :: """{"nullstr":"","ip":"27.31.100.29","headers":{}}""" :: """{"nullstr":"","ip":"27.31.100.29","headers":""}""" :: - """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil) + """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil))(Encoders.STRING) - def complexFieldValueTypeConflict: RDD[String] = - spark.sparkContext.parallelize( + def complexFieldValueTypeConflict: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"num_struct":11, "str_array":[1, 2, 3], "array":[], "struct_array":[], "struct": {}}""" :: """{"num_struct":{"field":false}, "str_array":null, @@ -62,24 +62,25 @@ private[json] trait TestJsonData { "array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": {"field":null}}""" :: """{"num_struct":{}, "str_array":["str1", "str2", 33], "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil) + )(Encoders.STRING) - def arrayElementTypeConflict: RDD[String] = - spark.sparkContext.parallelize( + def arrayElementTypeConflict: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}], "array2": [{"field":214748364700}, {"field":1}]}""" :: """{"array3": [{"field":"str"}, {"field":1}]}""" :: - """{"array3": [1, 2, 3]}""" :: Nil) + """{"array3": [1, 2, 3]}""" :: Nil))(Encoders.STRING) - def missingFields: RDD[String] = - spark.sparkContext.parallelize( + def missingFields: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"a":true}""" :: """{"b":21474836470}""" :: """{"c":[33, 44]}""" :: """{"d":{"field":true}}""" :: - """{"e":"str"}""" :: Nil) + """{"e":"str"}""" :: Nil))(Encoders.STRING) - def complexFieldAndType1: RDD[String] = - spark.sparkContext.parallelize( + def complexFieldAndType1: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], @@ -92,10 +93,10 @@ private[json] trait TestJsonData { "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] - }""" :: Nil) + }""" :: Nil))(Encoders.STRING) - def complexFieldAndType2: RDD[String] = - spark.sparkContext.parallelize( + def complexFieldAndType2: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "complexArrayOfStruct": [ { @@ -146,89 +147,90 @@ private[json] trait TestJsonData { {"inner3": [[{"inner4": 2}]]} ] ]] - }""" :: Nil) + }""" :: Nil))(Encoders.STRING) - def mapType1: RDD[String] = - spark.sparkContext.parallelize( + def mapType1: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"map": {"a": 1}}""" :: """{"map": {"b": 2}}""" :: """{"map": {"c": 3}}""" :: """{"map": {"c": 1, "d": 4}}""" :: - """{"map": {"e": null}}""" :: Nil) + """{"map": {"e": null}}""" :: Nil))(Encoders.STRING) - def mapType2: RDD[String] = - spark.sparkContext.parallelize( + def mapType2: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" :: """{"map": {"b": {"field2": 2}}}""" :: """{"map": {"c": {"field1": [], "field2": 4}}}""" :: """{"map": {"c": {"field2": 3}, "d": {"field1": [null]}}}""" :: """{"map": {"e": null}}""" :: - """{"map": {"f": {"field1": null}}}""" :: Nil) + """{"map": {"f": {"field1": null}}}""" :: Nil))(Encoders.STRING) - def nullsInArrays: RDD[String] = - spark.sparkContext.parallelize( + def nullsInArrays: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"field1":[[null], [[["Test"]]]]}""" :: """{"field2":[null, [{"Test":1}]]}""" :: """{"field3":[[null], [{"Test":"2"}]]}""" :: - """{"field4":[[null, [1,2,3]]]}""" :: Nil) + """{"field4":[[null, [1,2,3]]]}""" :: Nil))(Encoders.STRING) - def jsonArray: RDD[String] = - spark.sparkContext.parallelize( + def jsonArray: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """[{"a":"str_a_1"}]""" :: """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" :: """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: - """[]""" :: Nil) + """[]""" :: Nil))(Encoders.STRING) - def corruptRecords: RDD[String] = - spark.sparkContext.parallelize( + def corruptRecords: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{""" :: """""" :: """{"a":1, b:2}""" :: """{"a":{, b:3}""" :: """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: - """]""" :: Nil) + """]""" :: Nil))(Encoders.STRING) - def additionalCorruptRecords: RDD[String] = - spark.sparkContext.parallelize( + def additionalCorruptRecords: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"dummy":"test"}""" :: """[1,2,3]""" :: """":"test", "a":1}""" :: """42""" :: - """ ","ian":"test"}""" :: Nil) + """ ","ian":"test"}""" :: Nil))(Encoders.STRING) - def emptyRecords: RDD[String] = - spark.sparkContext.parallelize( + def emptyRecords: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{""" :: """""" :: """{"a": {}}""" :: """{"a": {"b": {}}}""" :: """{"b": [{"c": {}}]}""" :: - """]""" :: Nil) + """]""" :: Nil))(Encoders.STRING) - def timestampAsLong: RDD[String] = - spark.sparkContext.parallelize( - """{"ts":1451732645}""" :: Nil) + def timestampAsLong: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( + """{"ts":1451732645}""" :: Nil))(Encoders.STRING) - def arrayAndStructRecords: RDD[String] = - spark.sparkContext.parallelize( + def arrayAndStructRecords: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"a": {"b": 1}}""" :: - """{"a": []}""" :: Nil) + """{"a": []}""" :: Nil))(Encoders.STRING) - def floatingValueRecords: RDD[String] = - spark.sparkContext.parallelize( - s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil) + def floatingValueRecords: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( + s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil))(Encoders.STRING) - def bigIntegerRecords: RDD[String] = - spark.sparkContext.parallelize( - s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil) + def bigIntegerRecords: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( + s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil))(Encoders.STRING) - def datesRecords: RDD[String] = - spark.sparkContext.parallelize( + def datesRecords: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( """{"date": "26/08/2015 18:00"}""" :: """{"date": "27/10/2014 18:30"}""" :: - """{"date": "28/01/2016 20:00"}""" :: Nil) + """{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING) - lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil) + lazy val singleRow: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING) - def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]()) + def empty: Dataset[String] = spark.emptyDataset(Encoders.STRING) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 4a42f8ea79..916a01ee0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -33,14 +33,15 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfterEach { + import testImplicits._ protected override lazy val sql = spark.sql _ private var path: File = null override def beforeAll(): Unit = { super.beforeAll() - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - spark.read.json(rdd).createOrReplaceTempView("jt") + val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""").toDS() + spark.read.json(ds).createOrReplaceTempView("jt") } override def afterAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 4fc2f81f54..19835cd184 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -24,14 +24,16 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils class InsertSuite extends DataSourceTest with SharedSQLContext { + import testImplicits._ + protected override lazy val sql = spark.sql _ private var path: File = null override def beforeAll(): Unit = { super.beforeAll() path = Utils.createTempDir() - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - spark.read.json(rdd).createOrReplaceTempView("jt") + val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS() + spark.read.json(ds).createOrReplaceTempView("jt") sql( s""" |CREATE TEMPORARY VIEW jsonTable (a int, b string) @@ -129,7 +131,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { // Writing the table to less part files. val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5) - spark.read.json(rdd1).createOrReplaceTempView("jt1") + spark.read.json(rdd1.toDS()).createOrReplaceTempView("jt1") sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1 @@ -141,7 +143,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { // Writing the table to more part files. val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10) - spark.read.json(rdd2).createOrReplaceTempView("jt2") + spark.read.json(rdd1.toDS()).createOrReplaceTempView("jt2") sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index b1756c27fa..773d34dfaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -28,6 +28,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter { + import testImplicits._ + protected override lazy val sql = spark.sql _ private var originalDefaultSource: String = null private var path: File = null @@ -40,8 +42,8 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA path = Utils.createTempDir() path.delete() - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - df = spark.read.json(rdd) + val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""").toDS() + df = spark.read.json(ds) df.createOrReplaceTempView("jsonTable") } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java index f664d5a4cd..aefc9cc77d 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -26,7 +26,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.Window; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; @@ -35,7 +34,6 @@ import org.apache.spark.sql.hive.test.TestHive$; import org.apache.spark.sql.hive.aggregate.MyDoubleSum; public class JavaDataFrameSuite { - private transient JavaSparkContext sc; private transient SQLContext hc; Dataset<Row> df; @@ -50,13 +48,11 @@ public class JavaDataFrameSuite { @Before public void setUp() throws IOException { hc = TestHive$.MODULE$; - sc = new JavaSparkContext(hc.sparkContext()); - List<String> jsonObjects = new ArrayList<>(10); for (int i = 0; i < 10; i++) { jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}"); } - df = hc.read().json(sc.parallelize(jsonObjects)); + df = hc.read().json(hc.createDataset(jsonObjects, Encoders.STRING())); df.createOrReplaceTempView("window_table"); } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 061c7431a6..0b157a45e6 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -31,9 +31,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.QueryTest$; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; @@ -81,8 +81,8 @@ public class JavaMetastoreDataSourcesSuite { for (int i = 0; i < 10; i++) { jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); } - JavaRDD<String> rdd = sc.parallelize(jsonObjects); - df = sqlContext.read().json(rdd); + Dataset<String> ds = sqlContext.createDataset(jsonObjects, Encoders.STRING()); + df = sqlContext.read().json(ds); df.createOrReplaceTempView("jsonTable"); } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index e951bbe1dc..03ea0c8c77 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -511,9 +511,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("create external table") { withTempPath { tempPath => withTable("savedJsonTable", "createdJsonTable") { - val df = read.json(sparkContext.parallelize((1 to 10).map { i => + val df = read.json((1 to 10).map { i => s"""{ "a": $i, "b": "str$i" }""" - })) + }.toDS()) withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") { df.write diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index cfca1d7983..8a37bc3665 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils * A set of tests that validates support for Hive Explain command. */ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import testImplicits._ test("show cost in explain command") { // Only has sizeInBytes before ANALYZE command @@ -92,8 +93,8 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") { withTempView("jt") { - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - spark.read.json(rdd).createOrReplaceTempView("jt") + val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS() + spark.read.json(ds).createOrReplaceTempView("jt") val outputs = sql( s""" |EXPLAIN EXTENDED diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index b2f19d7753..ce92fbf349 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -31,15 +31,15 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested]) class HiveResolutionSuite extends HiveComparisonTest { test("SPARK-3698: case insensitive test for nested data") { - read.json(sparkContext.makeRDD( - """{"a": [{"a": {"a": 1}}]}""" :: Nil)).createOrReplaceTempView("nested") + read.json(Seq("""{"a": [{"a": {"a": 1}}]}""").toDS()) + .createOrReplaceTempView("nested") // This should be successfully analyzed sql("SELECT a[0].A.A from nested").queryExecution.analyzed } test("SPARK-5278: check ambiguous reference to fields") { - read.json(sparkContext.makeRDD( - """{"a": [{"b": 1, "B": 2}]}""" :: Nil)).createOrReplaceTempView("nested") + read.json(Seq("""{"a": [{"b": 1, "B": 2}]}""").toDS()) + .createOrReplaceTempView("nested") // there are 2 filed matching field name "b", we should report Ambiguous reference error val exception = intercept[AnalysisException] { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index faed8b5046..9f6176339e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -973,30 +973,30 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("SPARK-4296 Grouping field with Hive UDF as sub expression") { - val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil) - read.json(rdd).createOrReplaceTempView("data") + val ds = Seq("""{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""").toDS() + read.json(ds).createOrReplaceTempView("data") checkAnswer( sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"), Row("str-1", 1970)) dropTempTable("data") - read.json(rdd).createOrReplaceTempView("data") + read.json(ds).createOrReplaceTempView("data") checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971)) dropTempTable("data") } test("resolve udtf in projection #1") { - val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) - read.json(rdd).createOrReplaceTempView("data") + val ds = (1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS() + read.json(ds).createOrReplaceTempView("data") val df = sql("SELECT explode(a) AS val FROM data") val col = df("val") } test("resolve udtf in projection #2") { - val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}""")) - read.json(rdd).createOrReplaceTempView("data") + val ds = (1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS() + read.json(ds).createOrReplaceTempView("data") checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil) checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil) intercept[AnalysisException] { @@ -1010,8 +1010,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // TGF with non-TGF in project is allowed in Spark SQL, but not in Hive test("TGF with non-TGF in projection") { - val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil) - read.json(rdd).createOrReplaceTempView("data") + val ds = Seq("""{"a": "1", "b":"1"}""").toDS() + read.json(ds).createOrReplaceTempView("data") checkAnswer( sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"), Row("1", "1", "1", "1") :: Nil) @@ -1024,8 +1024,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // is not in a valid state (cannot be executed). Because of this bug, the analysis rule of // PreInsertionCasts will actually start to work before ImplicitGenerate and then // generates an invalid query plan. - val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) - read.json(rdd).createOrReplaceTempView("data") + val ds = (1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS() + read.json(ds).createOrReplaceTempView("data") withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { sql("CREATE TABLE explodeTest (key bigInt)") @@ -1262,9 +1262,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("SPARK-9371: fix the support for special chars in column names for hive context") { - read.json(sparkContext.makeRDD( - """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil)) - .createOrReplaceTempView("t") + val ds = Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS() + read.json(ds).createOrReplaceTempView("t") checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1)) } |