From d1e22b386839e6f81cfd83b1903b9dc8c4bbef64 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Jun 2014 19:14:59 -0700 Subject: [SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL JIRA: https://issues.apache.org/jira/browse/SPARK-2060 Programming guide: http://yhuai.github.io/site/sql-programming-guide.html Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext Author: Yin Huai Closes #999 from yhuai/newJson and squashes the following commits: 227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ce8eedd [Yin Huai] rxin's comments. bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 94ffdaa [Yin Huai] Remove "get" from method names. ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 79ea9ba [Yin Huai] Fix typos. 5428451 [Yin Huai] Newline 1f908ce [Yin Huai] Remove extra line. d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 7ea750e [Yin Huai] marmbrus's comments. 6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 83013fb [Yin Huai] Update Java Example. e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map. 6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4fbddf0 [Yin Huai] Programming guide. 9df8c5a [Yin Huai] Python API. 7027634 [Yin Huai] Java API. cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset. d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ab810b0 [Yin Huai] Make JsonRDD private. 6df0891 [Yin Huai] Apache header. 8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema. 8ffed79 [Yin Huai] Update the example. a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution. 65b87f0 [Yin Huai] Fix sampling... 8846af5 [Yin Huai] API doc. 52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 0387523 [Yin Huai] Address PR comments. 666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson a2313a6 [Yin Huai] Address PR comments. f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used. 0576406 [Yin Huai] Add Apache license header. af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD. f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema. (cherry picked from commit d2f4f30b12f99358953e2781957468e2cfe3c916) Signed-off-by: Reynold Xin --- .rat-excludes | 1 + docs/sql-programming-guide.md | 290 +++++++++--- .../apache/spark/examples/sql/JavaSparkSQL.java | 78 +++- examples/src/main/resources/people.json | 3 + project/SparkBuild.scala | 22 +- python/pyspark/sql.py | 64 ++- sql/catalyst/pom.xml | 28 ++ .../sql/catalyst/analysis/HiveTypeCoercion.scala | 25 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 51 ++ .../catalyst/optimizer/CombiningLimitsSuite.scala | 3 +- .../catalyst/optimizer/ConstantFoldingSuite.scala | 3 +- .../catalyst/optimizer/FilterPushdownSuite.scala | 5 +- .../sql/catalyst/optimizer/OptimizerTest.scala | 55 --- .../SimplifyCaseConversionExpressionsSuite.scala | 3 +- .../apache/spark/sql/catalyst/plans/PlanTest.scala | 54 +++ sql/core/pom.xml | 12 + .../scala/org/apache/spark/sql/SQLContext.scala | 45 +- .../scala/org/apache/spark/sql/SchemaRDD.scala | 38 +- .../scala/org/apache/spark/sql/SchemaRDDLike.scala | 6 + .../apache/spark/sql/api/java/JavaSQLContext.scala | 20 + .../scala/org/apache/spark/sql/json/JsonRDD.scala | 397 ++++++++++++++++ .../scala/org/apache/spark/sql/QueryTest.scala | 4 +- .../apache/spark/sql/api/java/JavaSQLSuite.scala | 45 ++ .../org/apache/spark/sql/json/JsonSuite.scala | 519 +++++++++++++++++++++ .../org/apache/spark/sql/json/TestJsonData.scala | 84 ++++ 25 files changed, 1694 insertions(+), 161 deletions(-) create mode 100644 examples/src/main/resources/people.json delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala diff --git a/.rat-excludes b/.rat-excludes index 15589702c5..4c5d560822 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -21,6 +21,7 @@ spark-env.sh.template log4j-defaults.properties sorttable.js .*txt +.*json .*data .*log cloudpickle.py diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4623bb4247..522c83884e 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -17,20 +17,20 @@ Spark. At the core of this component is a new type of RDD, [Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.
-Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using +Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using Spark. At the core of this component is a new type of RDD, [JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed [Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along with a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
@@ -41,7 +41,7 @@ Spark. At the core of this component is a new type of RDD, [Row](api/python/pyspark.sql.Row-class.html) objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.
@@ -64,8 +64,8 @@ descendants. To create a basic SQLContext, all you need is a SparkContext. val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -// Importing the SQL context gives access to all the public SQL functions and implicit conversions. -import sqlContext._ +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD {% endhighlight %} @@ -77,8 +77,8 @@ The entry point into all relational functionality in Spark is the of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext. {% highlight java %} -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx); +JavaSparkContext sc = ...; // An existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); {% endhighlight %} @@ -91,14 +91,33 @@ of its decedents. To create a basic SQLContext, all you need is a SparkContext. {% highlight python %} from pyspark.sql import SQLContext -sqlCtx = SQLContext(sc) +sqlContext = SQLContext(sc) {% endhighlight %} -## Running SQL on RDDs +# Data Sources + +
+
+Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +
+ +
+Spark SQL supports operating on a variety of data sources through the `JavaSchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +
+ +
+Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +
+
+ +## RDDs
@@ -111,8 +130,10 @@ types such as Sequences or Arrays. This RDD can be implicitly converted to a Sch registered as a table. Tables can be used in subsequent SQL statements. {% highlight scala %} +// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, @@ -124,7 +145,7 @@ val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(" people.registerAsTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. -val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -170,12 +191,11 @@ A schema can be applied to an existing RDD by calling `applySchema` and providin for the JavaBean. {% highlight java %} - -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx) +// sc is an existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc) // Load a text file and convert each line to a JavaBean. -JavaRDD people = ctx.textFile("examples/src/main/resources/people.txt").map( +JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").map( new Function() { public Person call(String line) throws Exception { String[] parts = line.split(","); @@ -189,11 +209,11 @@ JavaRDD people = ctx.textFile("examples/src/main/resources/people.txt"). }); // Apply a schema to an RDD of JavaBeans and register it as a table. -JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class); +JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class); schemaPeople.registerAsTable("people"); // SQL can be run over RDDs that have been registered as tables. -JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -215,6 +235,10 @@ row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as can be used in subsequent SQL statements. {% highlight python %} +# sc is an existing SparkContext. +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + # Load a text file and convert each line to a dictionary. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) @@ -223,14 +247,16 @@ people = parts.map(lambda p: {"name": p[0], "age": int(p[1])}) # Infer the schema, and register the SchemaRDD as a table. # In future versions of PySpark we would like to add support for registering RDDs with other # datatypes as tables -peopleTable = sqlCtx.inferSchema(people) -peopleTable.registerAsTable("people") +schemaPeople = sqlContext.inferSchema(people) +schemaPeople.registerAsTable("people") # SQL can be run over SchemaRDDs that have been registered as a table. -teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) +for teenName in teenNames.collect(): + print teenName {% endhighlight %}
@@ -241,7 +267,7 @@ teenNames = teenagers.map(lambda p: "Name: " + p.name) Users that want a more complete dialect of SQL should look at the HiveQL support provided by `HiveContext`. -## Using Parquet +## Parquet Files [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema @@ -252,22 +278,23 @@ of the original data. Using the data from the above example:
{% highlight scala %} -val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ +// sqlContext from the previous example is used in this example. +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. -// The RDD is implicitly converted to a SchemaRDD, allowing it to be stored using Parquet. +// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet. people.saveAsParquetFile("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. -// The result of loading a Parquet file is also a JavaSchemaRDD. +// The result of loading a Parquet file is also a SchemaRDD. val parquetFile = sqlContext.parquetFile("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile") -val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") -teenagers.collect().foreach(println) +val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) {% endhighlight %}
@@ -275,6 +302,7 @@ teenagers.collect().foreach(println)
{% highlight java %} +// sqlContext from the previous example is used in this example. JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example. @@ -283,13 +311,16 @@ schemaPeople.saveAsParquetFile("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a JavaSchemaRDD. -JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet"); +JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); -JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); - - +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); +List teenagerNames = teenagers.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0); + } +}).collect(); {% endhighlight %}
@@ -297,50 +328,149 @@ JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >=
{% highlight python %} +# sqlContext from the previous example is used in this example. -peopleTable # The SchemaRDD from the previous example. +schemaPeople # The SchemaRDD from the previous example. # SchemaRDDs can be saved as Parquet files, maintaining the schema information. -peopleTable.saveAsParquetFile("people.parquet") +schemaPeople.saveAsParquetFile("people.parquet") # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a SchemaRDD. -parquetFile = sqlCtx.parquetFile("people.parquet") +parquetFile = sqlContext.parquetFile("people.parquet") # Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); -teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") - +teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenNames = teenagers.map(lambda p: "Name: " + p.name) +for teenName in teenNames.collect(): + print teenName {% endhighlight %}
-## Writing Language-Integrated Relational Queries +## JSON Datasets +
-**Language-Integrated queries are currently only supported in Scala.** +
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD. +This conversion can be done using one of two methods in a SQLContext: -Spark SQL also supports a domain specific language for writing queries. Once again, -using the data from the above examples: +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. {% highlight scala %} +// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ -val people: RDD[Person] = ... // An RDD of case class objects, from the first example. -// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' -val teenagers = people.where('age >= 10).where('age <= 19).select('name) +// A JSON dataset is pointed to by path. +// The path can be either a single text file or a directory storing text files. +val path = "examples/src/main/resources/people.json" +// Create a SchemaRDD from the file(s) pointed to by path +val people = sqlContext.jsonFile(path) + +// The inferred schema can be visualized using the printSchema() method. +people.printSchema() +// root +// |-- age: IntegerType +// |-- name: StringType + +// Register this SchemaRDD as a table. +people.registerAsTable("people") + +// SQL statements can be run by using the sql methods provided by sqlContext. +val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +// Alternatively, a SchemaRDD can be created for a JSON dataset represented by +// an RDD[String] storing one JSON object per string. +val anotherPeopleRDD = sc.parallelize( + """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) +val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) {% endhighlight %} -The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers -prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are -evaluated by the SQL execution engine. A full list of the functions supported can be found in the -[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). +
- +
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a JavaSchemaRDD. +This conversion can be done using one of two methods in a JavaSQLContext : -# Hive Support +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. + +{% highlight java %} +// sc is an existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); + +// A JSON dataset is pointed to by path. +// The path can be either a single text file or a directory storing text files. +String path = "examples/src/main/resources/people.json"; +// Create a JavaSchemaRDD from the file(s) pointed to by path +JavaSchemaRDD people = sqlContext.jsonFile(path); + +// The inferred schema can be visualized using the printSchema() method. +people.printSchema(); +// root +// |-- age: IntegerType +// |-- name: StringType + +// Register this JavaSchemaRDD as a table. +people.registerAsTable("people"); + +// SQL statements can be run by using the sql methods provided by sqlContext. +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); + +// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by +// an RDD[String] storing one JSON object per string. +List jsonData = Arrays.asList( + "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); +JavaRDD anotherPeopleRDD = sc.parallelize(jsonData); +JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD); +{% endhighlight %} +
+ +
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD. +This conversion can be done using one of two methods in a SQLContext: + +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. + +{% highlight python %} +# sc is an existing SparkContext. +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + +# A JSON dataset is pointed to by path. +# The path can be either a single text file or a directory storing text files. +path = "examples/src/main/resources/people.json" +# Create a SchemaRDD from the file(s) pointed to by path +people = sqlContext.jsonFile(path) + +# The inferred schema can be visualized using the printSchema() method. +people.printSchema() +# root +# |-- age: IntegerType +# |-- name: StringType + +# Register this SchemaRDD as a table. +people.registerAsTable("people") + +# SQL statements can be run by using the sql methods provided by sqlContext. +teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +# Alternatively, a SchemaRDD can be created for a JSON dataset represented by +# an RDD[String] storing one JSON object per string. +anotherPeopleRDD = sc.parallelize([ + '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']) +anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) +{% endhighlight %} +
+ +
+ +## Hive Tables Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. @@ -362,17 +492,14 @@ which is similar to `HiveContext`, but creates a local copy of the `metastore` a automatically. {% highlight scala %} -val sc: SparkContext // An existing SparkContext. +// sc is an existing SparkContext. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) -// Importing the SQL context gives access to all the public SQL functions and implicit conversions. -import hiveContext._ - -hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL -hql("FROM src SELECT key, value").collect().foreach(println) +hiveContext.hql("FROM src SELECT key, value").collect().foreach(println) {% endhighlight %} @@ -385,14 +512,14 @@ the `sql` method a `JavaHiveContext` also provides an `hql` methods, which allow expressed in HiveQL. {% highlight java %} -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.HiveContext(ctx); +// sc is an existing JavaSparkContext. +JavaHiveContext hiveContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc); -hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); -hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. -Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect(); +Row[] results = hiveContext.hql("FROM src SELECT key, value").collect(); {% endhighlight %} @@ -406,17 +533,44 @@ the `sql` method a `HiveContext` also provides an `hql` methods, which allows qu expressed in HiveQL. {% highlight python %} - +# sc is an existing SparkContext. from pyspark.sql import HiveContext -hiveCtx = HiveContext(sc) +hiveContext = HiveContext(sc) -hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. -results = hiveCtx.hql("FROM src SELECT key, value").collect() +results = hiveContext.hql("FROM src SELECT key, value").collect() {% endhighlight %} + + +# Writing Language-Integrated Relational Queries + +**Language-Integrated queries are currently only supported in Scala.** + +Spark SQL also supports a domain specific language for writing queries. Once again, +using the data from the above examples: + +{% highlight scala %} +// sc is an existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) +// Importing the SQL context gives access to all the public SQL functions and implicit conversions. +import sqlContext._ +val people: RDD[Person] = ... // An RDD of case class objects, from the first example. + +// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' +val teenagers = people.where('age >= 10).where('age <= 19).select('name) +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) +{% endhighlight %} + +The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers +prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are +evaluated by the SQL execution engine. A full list of the functions supported can be found in the +[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). + + \ No newline at end of file diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index ad5ec84b71..607df3eddd 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -18,6 +18,7 @@ package org.apache.spark.examples.sql; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; @@ -56,6 +57,7 @@ public class JavaSparkSQL { JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaSQLContext sqlCtx = new JavaSQLContext(ctx); + System.out.println("=== Data source: RDD ==="); // Load a text file and convert each line to a Java Bean. JavaRDD people = ctx.textFile("examples/src/main/resources/people.txt").map( new Function() { @@ -84,16 +86,88 @@ public class JavaSparkSQL { return "Name: " + row.getString(0); } }).collect(); + for (String name: teenagerNames) { + System.out.println(name); + } + System.out.println("=== Data source: Parquet File ==="); // JavaSchemaRDDs can be saved as parquet files, maintaining the schema information. schemaPeople.saveAsParquetFile("people.parquet"); - // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. + // Read in the parquet file created above. + // Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a JavaSchemaRDD. JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); - JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); + JavaSchemaRDD teenagers2 = + sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); + teenagerNames = teenagers2.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0); + } + }).collect(); + for (String name: teenagerNames) { + System.out.println(name); + } + + System.out.println("=== Data source: JSON Dataset ==="); + // A JSON dataset is pointed by path. + // The path can be either a single text file or a directory storing text files. + String path = "examples/src/main/resources/people.json"; + // Create a JavaSchemaRDD from the file(s) pointed by path + JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path); + + // Because the schema of a JSON dataset is automatically inferred, to write queries, + // it is better to take a look at what is the schema. + peopleFromJsonFile.printSchema(); + // The schema of people is ... + // root + // |-- age: IntegerType + // |-- name: StringType + + // Register this JavaSchemaRDD as a table. + peopleFromJsonFile.registerAsTable("people"); + + // SQL statements can be run by using the sql methods provided by sqlCtx. + JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); + + // The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations. + // The columns of a row in the result can be accessed by ordinal. + teenagerNames = teenagers3.map(new Function() { + public String call(Row row) { return "Name: " + row.getString(0); } + }).collect(); + for (String name: teenagerNames) { + System.out.println(name); + } + + // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by + // a RDD[String] storing one JSON object per string. + List jsonData = Arrays.asList( + "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); + JavaRDD anotherPeopleRDD = ctx.parallelize(jsonData); + JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD); + + // Take a look at the schema of this new JavaSchemaRDD. + peopleFromJsonRDD.printSchema(); + // The schema of anotherPeople is ... + // root + // |-- address: StructType + // | |-- city: StringType + // | |-- state: StringType + // |-- name: StringType + + peopleFromJsonRDD.registerAsTable("people2"); + + JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2"); + List nameAndCity = peopleWithCity.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0) + ", City: " + row.getString(1); + } + }).collect(); + for (String name: nameAndCity) { + System.out.println(name); + } } } diff --git a/examples/src/main/resources/people.json b/examples/src/main/resources/people.json new file mode 100644 index 0000000000..50a859cbd7 --- /dev/null +++ b/examples/src/main/resources/people.json @@ -0,0 +1,3 @@ +{"name":"Michael"} +{"name":"Andy", "age":30} +{"name":"Justin", "age":19} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c0e3bbaf90..d71771ae18 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -72,7 +72,7 @@ object SparkBuild extends Build { lazy val catalyst = Project("catalyst", file("sql/catalyst"), settings = catalystSettings) dependsOn(core) - lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core, catalyst) + lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core) dependsOn(catalyst % "compile->compile;test->test") lazy val hive = Project("hive", file("sql/hive"), settings = hiveSettings) dependsOn(sql) @@ -484,9 +484,23 @@ object SparkBuild extends Build { def sqlCoreSettings = sharedSettings ++ Seq( name := "spark-sql", libraryDependencies ++= Seq( - "com.twitter" % "parquet-column" % parquetVersion, - "com.twitter" % "parquet-hadoop" % parquetVersion - ) + "com.twitter" % "parquet-column" % parquetVersion, + "com.twitter" % "parquet-hadoop" % parquetVersion, + "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.0" // json4s-jackson 3.2.6 requires jackson-databind 2.3.0. + ), + initialCommands in console := + """ + |import org.apache.spark.sql.catalyst.analysis._ + |import org.apache.spark.sql.catalyst.dsl._ + |import org.apache.spark.sql.catalyst.errors._ + |import org.apache.spark.sql.catalyst.expressions._ + |import org.apache.spark.sql.catalyst.plans.logical._ + |import org.apache.spark.sql.catalyst.rules._ + |import org.apache.spark.sql.catalyst.types._ + |import org.apache.spark.sql.catalyst.util._ + |import org.apache.spark.sql.execution + |import org.apache.spark.sql.test.TestSQLContext._ + |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin ) // Since we don't include hive in the main assembly this project also acts as an alternative diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index c31d49ce83..5051c82da3 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -15,7 +15,7 @@ # limitations under the License. # -from pyspark.rdd import RDD +from pyspark.rdd import RDD, PipelinedRDD from pyspark.serializers import BatchedSerializer, PickleSerializer from py4j.protocol import Py4JError @@ -137,6 +137,53 @@ class SQLContext: jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) + + def jsonFile(self, path): + """Loads a text file storing one JSON object per line, + returning the result as a L{SchemaRDD}. + It goes through the entire dataset once to determine the schema. + + >>> import tempfile, shutil + >>> jsonFile = tempfile.mkdtemp() + >>> shutil.rmtree(jsonFile) + >>> ofn = open(jsonFile, 'w') + >>> for json in jsonStrings: + ... print>>ofn, json + >>> ofn.close() + >>> srdd = sqlCtx.jsonFile(jsonFile) + >>> sqlCtx.registerRDDAsTable(srdd, "table1") + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1") + >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}}, + ... {"f1": 2, "f2": "row2", "f3":{"field4":22}}, + ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}] + True + """ + jschema_rdd = self._ssql_ctx.jsonFile(path) + return SchemaRDD(jschema_rdd, self) + + def jsonRDD(self, rdd): + """Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}. + It goes through the entire dataset once to determine the schema. + + >>> srdd = sqlCtx.jsonRDD(json) + >>> sqlCtx.registerRDDAsTable(srdd, "table1") + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1") + >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}}, + ... {"f1": 2, "f2": "row2", "f3":{"field4":22}}, + ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}] + True + """ + def func(split, iterator): + for x in iterator: + if not isinstance(x, basestring): + x = unicode(x) + yield x.encode("utf-8") + keyed = PipelinedRDD(rdd, func) + keyed._bypass_serializer = True + jrdd = keyed._jrdd.map(self._jvm.BytesToString()) + jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd()) + return SchemaRDD(jschema_rdd, self) + def sql(self, sqlQuery): """Return a L{SchemaRDD} representing the result of the given query. @@ -265,7 +312,7 @@ class SchemaRDD(RDD): For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the L{SchemaRDD} is not operated on directly, as it's underlying - implementation is a RDD composed of Java objects. Instead it is + implementation is an RDD composed of Java objects. Instead it is converted to a PythonRDD in the JVM, on which Python operations can be done. """ @@ -337,6 +384,14 @@ class SchemaRDD(RDD): """Creates a new table with the contents of this SchemaRDD.""" self._jschema_rdd.saveAsTable(tableName) + def schemaString(self): + """Returns the output schema in the tree format.""" + return self._jschema_rdd.schemaString() + + def printSchema(self): + """Prints out the schema in the tree format.""" + print self.schemaString() + def count(self): """Return the number of elements in this RDD. @@ -436,6 +491,11 @@ def _test(): globs['sqlCtx'] = SQLContext(sc) globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}', + '{"field1" : 2, "field2": "row2", "field3":{"field4":22}}', + '{"field1" : 3, "field2": "row3", "field3":{"field4":33}}'] + globs['jsonStrings'] = jsonStrings + globs['json'] = sc.parallelize(jsonStrings) globs['nestedRdd1'] = sc.parallelize([ {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]) diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index d3f65de16f..fc6d3d7be9 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -65,6 +65,34 @@ org.scalatest scalatest-maven-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + test-jar-on-compile + compile + + test-jar + + + + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index d291814c8a..66bff660ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -22,6 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.types._ +object HiveTypeCoercion { + // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types. + // The conversion for integral and floating point types have a linear widening hierarchy: + val numericPrecedence = + Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType) + // Boolean is only wider than Void + val booleanPrecedence = Seq(NullType, BooleanType) + val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil +} + /** * A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that * participate in operations into compatible ones. Most of these rules are based on Hive semantics, @@ -116,19 +126,18 @@ trait HiveTypeCoercion { * * Additionally, all types when UNION-ed with strings will be promoted to strings. * Other string conversions are handled by PromoteStrings. + * + * Widening types might result in loss of precision in the following cases: + * - IntegerType to FloatType + * - LongType to FloatType + * - LongType to DoubleType */ object WidenTypes extends Rule[LogicalPlan] { - // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types. - // The conversion for integral and floating point types have a linear widening hierarchy: - val numericPrecedence = - Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType) - // Boolean is only wider than Void - val booleanPrecedence = Seq(NullType, BooleanType) - val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = { // Try and find a promotion rule that contains both types in question. - val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2)) + val applicableConversion = + HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2)) // If found return the widest common type, otherwise None applicableConversion.map(_.filter(t => t == t1 || t == t2).last) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 8199a80f5d..00e2d3bc24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.types.{ArrayType, DataType, StructField, StructType} abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] { self: PlanType with Product => @@ -123,4 +125,53 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy case other => Nil }.toSeq } + + protected def generateSchemaString(schema: Seq[Attribute]): String = { + val builder = new StringBuilder + builder.append("root\n") + val prefix = " |" + schema.foreach { attribute => + val name = attribute.name + val dataType = attribute.dataType + dataType match { + case fields: StructType => + builder.append(s"$prefix-- $name: $StructType\n") + generateSchemaString(fields, s"$prefix |", builder) + case ArrayType(fields: StructType) => + builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n") + generateSchemaString(fields, s"$prefix |", builder) + case ArrayType(elementType: DataType) => + builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n") + case _ => builder.append(s"$prefix-- $name: $dataType\n") + } + } + + builder.toString() + } + + protected def generateSchemaString( + schema: StructType, + prefix: String, + builder: StringBuilder): StringBuilder = { + schema.fields.foreach { + case StructField(name, fields: StructType, _) => + builder.append(s"$prefix-- $name: $StructType\n") + generateSchemaString(fields, s"$prefix |", builder) + case StructField(name, ArrayType(fields: StructType), _) => + builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n") + generateSchemaString(fields, s"$prefix |", builder) + case StructField(name, ArrayType(elementType: DataType), _) => + builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n") + case StructField(name, fieldType: DataType, _) => + builder.append(s"$prefix-- $name: $fieldType\n") + } + + builder + } + + /** Returns the output schema in the tree format. */ + def schemaString: String = generateSchemaString(output) + + /** Prints out the schema in the tree format */ + def printSchema(): Unit = println(schemaString) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 714f01843c..4896f1b955 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ -class CombiningLimitsSuite extends OptimizerTest { +class CombiningLimitsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index 6efc0e211e..cea97c584f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types._ @@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ -class ConstantFoldingSuite extends OptimizerTest { +class ConstantFoldingSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 1f67c80e54..ebb123c1f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -20,13 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.LeftOuter -import org.apache.spark.sql.catalyst.plans.RightOuter +import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ -class FilterPushdownSuite extends OptimizerTest { +class FilterPushdownSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala deleted file mode 100644 index 89982d5cd8..0000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.optimizer - -import org.scalatest.FunSuite - -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util._ - -/** - * Provides helper methods for comparing plans produced by optimization rules with the expected - * result - */ -class OptimizerTest extends FunSuite { - - /** - * Since attribute references are given globally unique ids during analysis, - * we must normalize them to check if two different queries are identical. - */ - protected def normalizeExprIds(plan: LogicalPlan) = { - val minId = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)).min - plan transformAllExpressions { - case a: AttributeReference => - AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId)) - } - } - - /** Fails the test if the two plans do not match */ - protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) { - val normalized1 = normalizeExprIds(plan1) - val normalized2 = normalizeExprIds(plan2) - if (normalized1 != normalized2) - fail( - s""" - |== FAIL: Plans do not match === - |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")} - """.stripMargin) - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala index df1409fe7b..22992fb6f5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.rules._ /* Implicit conversions */ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -class SimplifyCaseConversionExpressionsSuite extends OptimizerTest { +class SimplifyCaseConversionExpressionsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala new file mode 100644 index 0000000000..7e9f47ef21 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util._ + +/** + * Provides helper methods for comparing plans. + */ +class PlanTest extends FunSuite { + + /** + * Since attribute references are given globally unique ids during analysis, + * we must normalize them to check if two different queries are identical. + */ + protected def normalizeExprIds(plan: LogicalPlan) = { + val minId = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)).min + plan transformAllExpressions { + case a: AttributeReference => + AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId)) + } + } + + /** Fails the test if the two plans do not match */ + protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) { + val normalized1 = normalizeExprIds(plan1) + val normalized2 = normalizeExprIds(plan2) + if (normalized1 != normalized2) + fail( + s""" + |== FAIL: Plans do not match === + |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")} + """.stripMargin) + } +} diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 6afed6df7e..21302b3319 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -42,6 +42,13 @@ spark-catalyst_${scala.binary.version} ${project.version} + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${project.version} + test-jar + test + com.twitter parquet-column @@ -52,6 +59,11 @@ parquet-hadoop ${parquet.version} + + com.fasterxml.jackson.core + jackson-databind + 2.3.0 + org.scalatest scalatest_${scala.binary.version} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 131c130bbb..f7e03323be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -22,24 +22,22 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.hadoop.conf.Configuration -import org.apache.spark.SparkContext import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} import org.apache.spark.rdd.RDD - import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.{ScalaReflection, dsl} +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.dsl.ExpressionConversions import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor - import org.apache.spark.sql.columnar.InMemoryRelation - import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.SparkStrategies - +import org.apache.spark.sql.json._ import org.apache.spark.sql.parquet.ParquetRelation +import org.apache.spark.SparkContext /** * :: AlphaComponent :: @@ -53,7 +51,7 @@ import org.apache.spark.sql.parquet.ParquetRelation class SQLContext(@transient val sparkContext: SparkContext) extends Logging with SQLConf - with dsl.ExpressionConversions + with ExpressionConversions with Serializable { self => @@ -98,6 +96,39 @@ class SQLContext(@transient val sparkContext: SparkContext) def parquetFile(path: String): SchemaRDD = new SchemaRDD(this, parquet.ParquetRelation(path)) + /** + * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]]. + * It goes through the entire dataset once to determine the schema. + * + * @group userf + */ + def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0) + + /** + * :: Experimental :: + */ + @Experimental + def jsonFile(path: String, samplingRatio: Double): SchemaRDD = { + val json = sparkContext.textFile(path) + jsonRDD(json, samplingRatio) + } + + /** + * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a + * [[SchemaRDD]]. + * It goes through the entire dataset once to determine the schema. + * + * @group userf + */ + def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0) + + /** + * :: Experimental :: + */ + @Experimental + def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = + new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio)) + /** * :: Experimental :: * Creates an empty parquet file with the schema of class `A`, which can be registered as a table. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 89eaba2d19..7c0efb4566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} -import org.apache.spark.sql.catalyst.types.BooleanType +import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType} import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} import org.apache.spark.api.java.JavaRDD import java.util.{Map => JMap} @@ -41,8 +41,10 @@ import java.util.{Map => JMap} * whose elements are scala case classes into a SchemaRDD. This conversion can also be done * explicitly using the `createSchemaRDD` function on a [[SQLContext]]. * - * A `SchemaRDD` can also be created by loading data in from external sources, for example, - * by using the `parquetFile` method on [[SQLContext]]. + * A `SchemaRDD` can also be created by loading data in from external sources. + * Examples are loading data from Parquet files by using by using the + * `parquetFile` method on [[SQLContext]], and loading JSON datasets + * by using `jsonFile` and `jsonRDD` methods on [[SQLContext]]. * * == SQL Queries == * A SchemaRDD can be registered as a table in the [[SQLContext]] that was used to create it. Once @@ -341,14 +343,38 @@ class SchemaRDD( */ def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan) + /** + * Converts a JavaRDD to a PythonRDD. It is used by pyspark. + */ private[sql] def javaToPython: JavaRDD[Array[Byte]] = { - val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) + def rowToMap(row: Row, structType: StructType): JMap[String, Any] = { + val fields = structType.fields.map(field => (field.name, field.dataType)) + val map: JMap[String, Any] = new java.util.HashMap + row.zip(fields).foreach { + case (obj, (name, dataType)) => + dataType match { + case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct)) + case other => map.put(name, obj) + } + } + + map + } + + // TODO: Actually, the schema of a row should be represented by a StructType instead of + // a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to + // construct the Map for python. + val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map( + field => (field.name, field.dataType)) this.mapPartitions { iter => val pickle = new Pickler iter.map { row => val map: JMap[String, Any] = new java.util.HashMap - row.zip(fieldNames).foreach { case (obj, name) => - map.put(name, obj) + row.zip(fields).foreach { case (obj, (name, dataType)) => + dataType match { + case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct)) + case other => map.put(name, obj) + } } map }.grouped(10).map(batched => pickle.dumps(batched.toArray)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index 656be965a8..fe81721943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -122,4 +122,10 @@ private[sql] trait SchemaRDDLike { @Experimental def saveAsTable(tableName: String): Unit = sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd + + /** Returns the output schema in the tree format. */ + def schemaString: String = queryExecution.analyzed.schemaString + + /** Prints out the schema in the tree format. */ + def printSchema(): Unit = println(schemaString) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 352260fa15..ff9842267f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.sql.json.JsonRDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow} import org.apache.spark.sql.catalyst.types._ @@ -100,6 +101,25 @@ class JavaSQLContext(val sqlContext: SQLContext) { def parquetFile(path: String): JavaSchemaRDD = new JavaSchemaRDD(sqlContext, ParquetRelation(path)) + /** + * Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]]. + * It goes through the entire dataset once to determine the schema. + * + * @group userf + */ + def jsonFile(path: String): JavaSchemaRDD = + jsonRDD(sqlContext.sparkContext.textFile(path)) + + /** + * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a + * [[JavaSchemaRDD]]. + * It goes through the entire dataset once to determine the schema. + * + * @group userf + */ + def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = + new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0)) + /** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only * during the lifetime of this instance of SQLContext. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala new file mode 100644 index 0000000000..edf8677557 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.json + +import scala.collection.JavaConversions._ +import scala.math.BigDecimal + +import com.fasterxml.jackson.databind.ObjectMapper + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} +import org.apache.spark.sql.Logging + +private[sql] object JsonRDD extends Logging { + + private[sql] def inferSchema( + json: RDD[String], + samplingRatio: Double = 1.0): LogicalPlan = { + require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0") + val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1) + val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _) + val baseSchema = createSchema(allKeys) + + createLogicalPlan(json, baseSchema) + } + + private def createLogicalPlan( + json: RDD[String], + baseSchema: StructType): LogicalPlan = { + val schema = nullTypeToStringType(baseSchema) + + SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema)))) + } + + private def createSchema(allKeys: Set[(String, DataType)]): StructType = { + // Resolve type conflicts + val resolved = allKeys.groupBy { + case (key, dataType) => key + }.map { + // Now, keys and types are organized in the format of + // key -> Set(type1, type2, ...). + case (key, typeSet) => { + val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq + val dataType = typeSet.map { + case (_, dataType) => dataType + }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2)) + + (fieldName, dataType) + } + } + + def makeStruct(values: Seq[Seq[String]], prefix: Seq[String]): StructType = { + val (topLevel, structLike) = values.partition(_.size == 1) + val topLevelFields = topLevel.filter { + name => resolved.get(prefix ++ name).get match { + case ArrayType(StructType(Nil)) => false + case ArrayType(_) => true + case struct: StructType => false + case _ => true + } + }.map { + a => StructField(a.head, resolved.get(prefix ++ a).get, nullable = true) + } + + val structFields: Seq[StructField] = structLike.groupBy(_(0)).map { + case (name, fields) => { + val nestedFields = fields.map(_.tail) + val structType = makeStruct(nestedFields, prefix :+ name) + val dataType = resolved.get(prefix :+ name).get + dataType match { + case array: ArrayType => Some(StructField(name, ArrayType(structType), nullable = true)) + case struct: StructType => Some(StructField(name, structType, nullable = true)) + // dataType is StringType means that we have resolved type conflicts involving + // primitive types and complex types. So, the type of name has been relaxed to + // StringType. Also, this field should have already been put in topLevelFields. + case StringType => None + } + } + }.flatMap(field => field).toSeq + + StructType( + (topLevelFields ++ structFields).sortBy { + case StructField(name, _, _) => name + }) + } + + makeStruct(resolved.keySet.toSeq, Nil) + } + + /** + * Returns the most general data type for two given data types. + */ + private[json] def compatibleType(t1: DataType, t2: DataType): DataType = { + // Try and find a promotion rule that contains both types in question. + val applicableConversion = HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p + .contains(t2)) + + // If found return the widest common type, otherwise None + val returnType = applicableConversion.map(_.filter(t => t == t1 || t == t2).last) + + if (returnType.isDefined) { + returnType.get + } else { + // t1 or t2 is a StructType, ArrayType, or an unexpected type. + (t1, t2) match { + case (other: DataType, NullType) => other + case (NullType, other: DataType) => other + case (StructType(fields1), StructType(fields2)) => { + val newFields = (fields1 ++ fields2).groupBy(field => field.name).map { + case (name, fieldTypes) => { + val dataType = fieldTypes.map(field => field.dataType).reduce( + (type1: DataType, type2: DataType) => compatibleType(type1, type2)) + StructField(name, dataType, true) + } + } + StructType(newFields.toSeq.sortBy { + case StructField(name, _, _) => name + }) + } + case (ArrayType(elementType1), ArrayType(elementType2)) => + ArrayType(compatibleType(elementType1, elementType2)) + // TODO: We should use JsonObjectStringType to mark that values of field will be + // strings and every string is a Json object. + case (_, _) => StringType + } + } + } + + private def typeOfPrimitiveValue(value: Any): DataType = { + value match { + case value: java.lang.String => StringType + case value: java.lang.Integer => IntegerType + case value: java.lang.Long => LongType + // Since we do not have a data type backed by BigInteger, + // when we see a Java BigInteger, we use DecimalType. + case value: java.math.BigInteger => DecimalType + case value: java.lang.Double => DoubleType + case value: java.math.BigDecimal => DecimalType + case value: java.lang.Boolean => BooleanType + case null => NullType + // Unexpected data type. + case _ => StringType + } + } + + /** + * Returns the element type of an JSON array. We go through all elements of this array + * to detect any possible type conflict. We use [[compatibleType]] to resolve + * type conflicts. Right now, when the element of an array is another array, we + * treat the element as String. + */ + private def typeOfArray(l: Seq[Any]): ArrayType = { + val elements = l.flatMap(v => Option(v)) + if (elements.isEmpty) { + // If this JSON array is empty, we use NullType as a placeholder. + // If this array is not empty in other JSON objects, we can resolve + // the type after we have passed through all JSON objects. + ArrayType(NullType) + } else { + val elementType = elements.map { + e => e match { + case map: Map[_, _] => StructType(Nil) + // We have an array of arrays. If those element arrays do not have the same + // element types, we will return ArrayType[StringType]. + case seq: Seq[_] => typeOfArray(seq) + case value => typeOfPrimitiveValue(value) + } + }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2)) + + ArrayType(elementType) + } + } + + /** + * Figures out all key names and data types of values from a parsed JSON object + * (in the format of Map[Stirng, Any]). When the value of a key is an JSON object, we + * only use a placeholder (StructType(Nil)) to mark that it should be a struct + * instead of getting all fields of this struct because a field does not appear + * in this JSON object can appear in other JSON objects. + */ + private def allKeysWithValueTypes(m: Map[String, Any]): Set[(String, DataType)] = { + m.map{ + // Quote the key with backticks to handle cases which have dots + // in the field name. + case (key, dataType) => (s"`$key`", dataType) + }.flatMap { + case (key: String, struct: Map[String, Any]) => { + // The value associted with the key is an JSON object. + allKeysWithValueTypes(struct).map { + case (k, dataType) => (s"$key.$k", dataType) + } ++ Set((key, StructType(Nil))) + } + case (key: String, array: List[Any]) => { + // The value associted with the key is an array. + typeOfArray(array) match { + case ArrayType(StructType(Nil)) => { + // The elements of this arrays are structs. + array.asInstanceOf[List[Map[String, Any]]].flatMap { + element => allKeysWithValueTypes(element) + }.map { + case (k, dataType) => (s"$key.$k", dataType) + } :+ (key, ArrayType(StructType(Nil))) + } + case ArrayType(elementType) => (key, ArrayType(elementType)) :: Nil + } + } + case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil + }.toSet + } + + /** + * Converts a Java Map/List to a Scala Map/List. + * We do not use Jackson's scala module at here because + * DefaultScalaModule in jackson-module-scala will make + * the parsing very slow. + */ + private def scalafy(obj: Any): Any = obj match { + case map: java.util.Map[String, Object] => + // .map(identity) is used as a workaround of non-serializable Map + // generated by .mapValues. + // This issue is documented at https://issues.scala-lang.org/browse/SI-7005 + map.toMap.mapValues(scalafy).map(identity) + case list: java.util.List[Object] => + list.toList.map(scalafy) + case atom => atom + } + + private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = { + // According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72], + // ObjectMapper will not return BigDecimal when + // "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled + // (see NumberDeserializer.deserialize for the logic). + // But, we do not want to enable this feature because it will use BigDecimal + // for every float number, which will be slow. + // So, right now, we will have Infinity for those BigDecimal number. + // TODO: Support BigDecimal. + json.mapPartitions(iter => { + // When there is a key appearing multiple times (a duplicate key), + // the ObjectMapper will take the last value associated with this duplicate key. + // For example: for {"key": 1, "key":2}, we will get "key"->2. + val mapper = new ObjectMapper() + iter.map(record => mapper.readValue(record, classOf[java.util.Map[String, Any]])) + }).map(scalafy).map(_.asInstanceOf[Map[String, Any]]) + } + + private def toLong(value: Any): Long = { + value match { + case value: java.lang.Integer => value.asInstanceOf[Int].toLong + case value: java.lang.Long => value.asInstanceOf[Long] + } + } + + private def toDouble(value: Any): Double = { + value match { + case value: java.lang.Integer => value.asInstanceOf[Int].toDouble + case value: java.lang.Long => value.asInstanceOf[Long].toDouble + case value: java.lang.Double => value.asInstanceOf[Double] + } + } + + private def toDecimal(value: Any): BigDecimal = { + value match { + case value: java.lang.Integer => BigDecimal(value) + case value: java.lang.Long => BigDecimal(value) + case value: java.math.BigInteger => BigDecimal(value) + case value: java.lang.Double => BigDecimal(value) + case value: java.math.BigDecimal => BigDecimal(value) + } + } + + private def toJsonArrayString(seq: Seq[Any]): String = { + val builder = new StringBuilder + builder.append("[") + var count = 0 + seq.foreach { + element => + if (count > 0) builder.append(",") + count += 1 + builder.append(toString(element)) + } + builder.append("]") + + builder.toString() + } + + private def toJsonObjectString(map: Map[String, Any]): String = { + val builder = new StringBuilder + builder.append("{") + var count = 0 + map.foreach { + case (key, value) => + if (count > 0) builder.append(",") + count += 1 + builder.append(s"""\"${key}\":${toString(value)}""") + } + builder.append("}") + + builder.toString() + } + + private def toString(value: Any): String = { + value match { + case value: Map[String, Any] => toJsonObjectString(value) + case value: Seq[Any] => toJsonArrayString(value) + case value => Option(value).map(_.toString).orNull + } + } + + private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={ + if (value == null) { + null + } else { + desiredType match { + case ArrayType(elementType) => + value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) + case StringType => toString(value) + case IntegerType => value.asInstanceOf[IntegerType.JvmType] + case LongType => toLong(value) + case DoubleType => toDouble(value) + case DecimalType => toDecimal(value) + case BooleanType => value.asInstanceOf[BooleanType.JvmType] + case NullType => null + } + } + } + + private def asRow(json: Map[String,Any], schema: StructType): Row = { + val row = new GenericMutableRow(schema.fields.length) + schema.fields.zipWithIndex.foreach { + // StructType + case (StructField(name, fields: StructType, _), i) => + row.update(i, json.get(name).flatMap(v => Option(v)).map( + v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull) + + // ArrayType(StructType) + case (StructField(name, ArrayType(structType: StructType), _), i) => + row.update(i, + json.get(name).flatMap(v => Option(v)).map( + v => v.asInstanceOf[Seq[Any]].map( + e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull) + + // Other cases + case (StructField(name, dataType, _), i) => + row.update(i, json.get(name).flatMap(v => Option(v)).map( + enforceCorrectType(_, dataType)).getOrElse(null)) + } + + row + } + + private def nullTypeToStringType(struct: StructType): StructType = { + val fields = struct.fields.map { + case StructField(fieldName, dataType, nullable) => { + val newType = dataType match { + case NullType => StringType + case ArrayType(NullType) => ArrayType(StringType) + case struct: StructType => nullTypeToStringType(struct) + case other: DataType => other + } + StructField(fieldName, newType, nullable) + } + } + + StructType(fields) + } + + private def asAttributes(struct: StructType): Seq[AttributeReference] = { + struct.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)()) + } + + private def asStruct(attributes: Seq[AttributeReference]): StructType = { + val fields = attributes.map { + case AttributeReference(name, dataType, nullable) => StructField(name, dataType, nullable) + } + + StructType(fields) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index d7f6abaf5d..ef84ead2e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -17,12 +17,10 @@ package org.apache.spark.sql -import org.scalatest.FunSuite - import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ -class QueryTest extends FunSuite { +class QueryTest extends PlanTest { /** * Runs the plan and makes sure the answer matches the expected result. * @param rdd the [[SchemaRDD]] to be executed diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala index 9fff7222fe..020baf0c7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala @@ -22,6 +22,7 @@ import scala.beans.BeanProperty import org.scalatest.FunSuite import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.TestSQLContext // Implicits @@ -111,4 +112,48 @@ class JavaSQLSuite extends FunSuite { """.stripMargin).collect.head.row === Seq.fill(8)(null)) } + + test("loads JSON datasets") { + val jsonString = + """{"string":"this is a simple string.", + "integer":10, + "long":21474836470, + "bigInteger":92233720368547758070, + "double":1.7976931348623157E308, + "boolean":true, + "null":null + }""".replaceAll("\n", " ") + val rdd = javaCtx.parallelize(jsonString :: Nil) + + var schemaRDD = javaSqlCtx.jsonRDD(rdd) + + schemaRDD.registerAsTable("jsonTable1") + + assert( + javaSqlCtx.sql("select * from jsonTable1").collect.head.row === + Seq(BigDecimal("92233720368547758070"), + true, + 1.7976931348623157E308, + 10, + 21474836470L, + null, + "this is a simple string.")) + + val file = getTempFilePath("json") + val path = file.toString + rdd.saveAsTextFile(path) + schemaRDD = javaSqlCtx.jsonFile(path) + + schemaRDD.registerAsTable("jsonTable2") + + assert( + javaSqlCtx.sql("select * from jsonTable2").collect.head.row === + Seq(BigDecimal("92233720368547758070"), + true, + 1.7976931348623157E308, + 10, + 21474836470L, + null, + "this is a simple string.")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala new file mode 100644 index 0000000000..10bd9f08f0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.json + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType} +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.TestSQLContext._ + +protected case class Schema(output: Seq[Attribute]) extends LeafNode + +class JsonSuite extends QueryTest { + import TestJsonData._ + TestJsonData + + test("Type promotion") { + def checkTypePromotion(expected: Any, actual: Any) { + assert(expected.getClass == actual.getClass, + s"Failed to promote ${actual.getClass} to ${expected.getClass}.") + assert(expected == actual, + s"Promoted value ${actual}(${actual.getClass}) does not equal the expected value " + + s"${expected}(${expected.getClass}).") + } + + val intNumber: Int = 2147483647 + checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType)) + checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType)) + checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType)) + checkTypePromotion(BigDecimal(intNumber), enforceCorrectType(intNumber, DecimalType)) + + val longNumber: Long = 9223372036854775807L + checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType)) + checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType)) + checkTypePromotion(BigDecimal(longNumber), enforceCorrectType(longNumber, DecimalType)) + + val doubleNumber: Double = 1.7976931348623157E308d + checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType)) + checkTypePromotion(BigDecimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType)) + } + + test("Get compatible type") { + def checkDataType(t1: DataType, t2: DataType, expected: DataType) { + var actual = compatibleType(t1, t2) + assert(actual == expected, + s"Expected $expected as the most general data type for $t1 and $t2, found $actual") + actual = compatibleType(t2, t1) + assert(actual == expected, + s"Expected $expected as the most general data type for $t1 and $t2, found $actual") + } + + // NullType + checkDataType(NullType, BooleanType, BooleanType) + checkDataType(NullType, IntegerType, IntegerType) + checkDataType(NullType, LongType, LongType) + checkDataType(NullType, DoubleType, DoubleType) + checkDataType(NullType, DecimalType, DecimalType) + checkDataType(NullType, StringType, StringType) + checkDataType(NullType, ArrayType(IntegerType), ArrayType(IntegerType)) + checkDataType(NullType, StructType(Nil), StructType(Nil)) + checkDataType(NullType, NullType, NullType) + + // BooleanType + checkDataType(BooleanType, BooleanType, BooleanType) + checkDataType(BooleanType, IntegerType, StringType) + checkDataType(BooleanType, LongType, StringType) + checkDataType(BooleanType, DoubleType, StringType) + checkDataType(BooleanType, DecimalType, StringType) + checkDataType(BooleanType, StringType, StringType) + checkDataType(BooleanType, ArrayType(IntegerType), StringType) + checkDataType(BooleanType, StructType(Nil), StringType) + + // IntegerType + checkDataType(IntegerType, IntegerType, IntegerType) + checkDataType(IntegerType, LongType, LongType) + checkDataType(IntegerType, DoubleType, DoubleType) + checkDataType(IntegerType, DecimalType, DecimalType) + checkDataType(IntegerType, StringType, StringType) + checkDataType(IntegerType, ArrayType(IntegerType), StringType) + checkDataType(IntegerType, StructType(Nil), StringType) + + // LongType + checkDataType(LongType, LongType, LongType) + checkDataType(LongType, DoubleType, DoubleType) + checkDataType(LongType, DecimalType, DecimalType) + checkDataType(LongType, StringType, StringType) + checkDataType(LongType, ArrayType(IntegerType), StringType) + checkDataType(LongType, StructType(Nil), StringType) + + // DoubleType + checkDataType(DoubleType, DoubleType, DoubleType) + checkDataType(DoubleType, DecimalType, DecimalType) + checkDataType(DoubleType, StringType, StringType) + checkDataType(DoubleType, ArrayType(IntegerType), StringType) + checkDataType(DoubleType, StructType(Nil), StringType) + + // DoubleType + checkDataType(DecimalType, DecimalType, DecimalType) + checkDataType(DecimalType, StringType, StringType) + checkDataType(DecimalType, ArrayType(IntegerType), StringType) + checkDataType(DecimalType, StructType(Nil), StringType) + + // StringType + checkDataType(StringType, StringType, StringType) + checkDataType(StringType, ArrayType(IntegerType), StringType) + checkDataType(StringType, StructType(Nil), StringType) + + // ArrayType + checkDataType(ArrayType(IntegerType), ArrayType(IntegerType), ArrayType(IntegerType)) + checkDataType(ArrayType(IntegerType), ArrayType(LongType), ArrayType(LongType)) + checkDataType(ArrayType(IntegerType), ArrayType(StringType), ArrayType(StringType)) + checkDataType(ArrayType(IntegerType), StructType(Nil), StringType) + + // StructType + checkDataType(StructType(Nil), StructType(Nil), StructType(Nil)) + checkDataType( + StructType(StructField("f1", IntegerType, true) :: Nil), + StructType(StructField("f1", IntegerType, true) :: Nil), + StructType(StructField("f1", IntegerType, true) :: Nil)) + checkDataType( + StructType(StructField("f1", IntegerType, true) :: Nil), + StructType(Nil), + StructType(StructField("f1", IntegerType, true) :: Nil)) + checkDataType( + StructType( + StructField("f1", IntegerType, true) :: + StructField("f2", IntegerType, true) :: Nil), + StructType(StructField("f1", LongType, true) :: Nil) , + StructType( + StructField("f1", LongType, true) :: + StructField("f2", IntegerType, true) :: Nil)) + checkDataType( + StructType( + StructField("f1", IntegerType, true) :: Nil), + StructType( + StructField("f2", IntegerType, true) :: Nil), + StructType( + StructField("f1", IntegerType, true) :: + StructField("f2", IntegerType, true) :: Nil)) + checkDataType( + StructType( + StructField("f1", IntegerType, true) :: Nil), + DecimalType, + StringType) + } + + test("Primitive field and type inferring") { + val jsonSchemaRDD = jsonRDD(primitiveFieldAndType) + + val expectedSchema = + AttributeReference("bigInteger", DecimalType, true)() :: + AttributeReference("boolean", BooleanType, true)() :: + AttributeReference("double", DoubleType, true)() :: + AttributeReference("integer", IntegerType, true)() :: + AttributeReference("long", LongType, true)() :: + AttributeReference("null", StringType, true)() :: + AttributeReference("string", StringType, true)() :: Nil + + comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output)) + + jsonSchemaRDD.registerAsTable("jsonTable") + + checkAnswer( + sql("select * from jsonTable"), + (BigDecimal("92233720368547758070"), + true, + 1.7976931348623157E308, + 10, + 21474836470L, + null, + "this is a simple string.") :: Nil + ) + } + + test("Complex field and type inferring") { + val jsonSchemaRDD = jsonRDD(complexFieldAndType) + + val expectedSchema = + AttributeReference("arrayOfArray1", ArrayType(ArrayType(StringType)), true)() :: + AttributeReference("arrayOfArray2", ArrayType(ArrayType(DoubleType)), true)() :: + AttributeReference("arrayOfBigInteger", ArrayType(DecimalType), true)() :: + AttributeReference("arrayOfBoolean", ArrayType(BooleanType), true)() :: + AttributeReference("arrayOfDouble", ArrayType(DoubleType), true)() :: + AttributeReference("arrayOfInteger", ArrayType(IntegerType), true)() :: + AttributeReference("arrayOfLong", ArrayType(LongType), true)() :: + AttributeReference("arrayOfNull", ArrayType(StringType), true)() :: + AttributeReference("arrayOfString", ArrayType(StringType), true)() :: + AttributeReference("arrayOfStruct", ArrayType( + StructType(StructField("field1", BooleanType, true) :: + StructField("field2", StringType, true) :: Nil)), true)() :: + AttributeReference("struct", StructType( + StructField("field1", BooleanType, true) :: + StructField("field2", DecimalType, true) :: Nil), true)() :: + AttributeReference("structWithArrayFields", StructType( + StructField("field1", ArrayType(IntegerType), true) :: + StructField("field2", ArrayType(StringType), true) :: Nil), true)() :: Nil + + comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output)) + + jsonSchemaRDD.registerAsTable("jsonTable") + + // Access elements of a primitive array. + checkAnswer( + sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"), + ("str1", "str2", null) :: Nil + ) + + // Access an array of null values. + checkAnswer( + sql("select arrayOfNull from jsonTable"), + Seq(Seq(null, null, null, null)) :: Nil + ) + + // Access elements of a BigInteger array (we use DecimalType internally). + checkAnswer( + sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"), + (BigDecimal("922337203685477580700"), BigDecimal("-922337203685477580800"), null) :: Nil + ) + + // Access elements of an array of arrays. + checkAnswer( + sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"), + (Seq("1", "2", "3"), Seq("str1", "str2")) :: Nil + ) + + // Access elements of an array of arrays. + checkAnswer( + sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"), + (Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) :: Nil + ) + + // Access elements of an array inside a filed with the type of ArrayType(ArrayType). + checkAnswer( + sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"), + ("str2", 2.1) :: Nil + ) + + // Access elements of an array of structs. + checkAnswer( + sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2] from jsonTable"), + (true :: "str1" :: Nil, false :: null :: Nil, null) :: Nil + ) + + // Access a struct and fields inside of it. + checkAnswer( + sql("select struct, struct.field1, struct.field2 from jsonTable"), + ( + Seq(true, BigDecimal("92233720368547758070")), + true, + BigDecimal("92233720368547758070")) :: Nil + ) + + // Access an array field of a struct. + checkAnswer( + sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"), + (Seq(4, 5, 6), Seq("str1", "str2")) :: Nil + ) + + // Access elements of an array field of a struct. + checkAnswer( + sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"), + (5, null) :: Nil + ) + } + + ignore("Complex field and type inferring (Ignored)") { + val jsonSchemaRDD = jsonRDD(complexFieldAndType) + jsonSchemaRDD.registerAsTable("jsonTable") + + // Right now, "field1" and "field2" are treated as aliases. We should fix it. + checkAnswer( + sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"), + (true, "str1") :: Nil + ) + + // Right now, the analyzer cannot resolve arrayOfStruct.field1 and arrayOfStruct.field2. + // Getting all values of a specific field from an array of structs. + checkAnswer( + sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"), + (Seq(true, false), Seq("str1", null)) :: Nil + ) + } + + test("Type conflict in primitive field values") { + val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict) + + val expectedSchema = + AttributeReference("num_bool", StringType, true)() :: + AttributeReference("num_num_1", LongType, true)() :: + AttributeReference("num_num_2", DecimalType, true)() :: + AttributeReference("num_num_3", DoubleType, true)() :: + AttributeReference("num_str", StringType, true)() :: + AttributeReference("str_bool", StringType, true)() :: Nil + + comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output)) + + jsonSchemaRDD.registerAsTable("jsonTable") + + checkAnswer( + sql("select * from jsonTable"), + ("true", 11L, null, 1.1, "13.1", "str1") :: + ("12", null, BigDecimal("21474836470.9"), null, null, "true") :: + ("false", 21474836470L, BigDecimal("92233720368547758070"), 100, "str1", "false") :: + (null, 21474836570L, BigDecimal(1.1), 21474836470L, "92233720368547758070", null) :: Nil + ) + + // Number and Boolean conflict: resolve the type as number in this query. + checkAnswer( + sql("select num_bool - 10 from jsonTable where num_bool > 11"), + 2 + ) + + // Widening to LongType + checkAnswer( + sql("select num_num_1 - 100 from jsonTable where num_num_1 > 11"), + Seq(21474836370L) :: Seq(21474836470L) :: Nil + ) + + checkAnswer( + sql("select num_num_1 - 100 from jsonTable where num_num_1 > 10"), + Seq(-89) :: Seq(21474836370L) :: Seq(21474836470L) :: Nil + ) + + // Widening to DecimalType + checkAnswer( + sql("select num_num_2 + 1.2 from jsonTable where num_num_2 > 1.1"), + Seq(BigDecimal("21474836472.1")) :: Seq(BigDecimal("92233720368547758071.2")) :: Nil + ) + + // Widening to DoubleType + checkAnswer( + sql("select num_num_3 + 1.2 from jsonTable where num_num_3 > 1.1"), + Seq(101.2) :: Seq(21474836471.2) :: Nil + ) + + // Number and String conflict: resolve the type as number in this query. + checkAnswer( + sql("select num_str + 1.2 from jsonTable where num_str > 14"), + 92233720368547758071.2 + ) + + // String and Boolean conflict: resolve the type as string. + checkAnswer( + sql("select * from jsonTable where str_bool = 'str1'"), + ("true", 11L, null, 1.1, "13.1", "str1") :: Nil + ) + } + + ignore("Type conflict in primitive field values (Ignored)") { + val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict) + jsonSchemaRDD.registerAsTable("jsonTable") + + // Right now, the analyzer does not promote strings in a boolean expreesion. + // Number and Boolean conflict: resolve the type as boolean in this query. + checkAnswer( + sql("select num_bool from jsonTable where NOT num_bool"), + false + ) + + checkAnswer( + sql("select str_bool from jsonTable where NOT str_bool"), + false + ) + + // Right now, the analyzer does not know that num_bool should be treated as a boolean. + // Number and Boolean conflict: resolve the type as boolean in this query. + checkAnswer( + sql("select num_bool from jsonTable where num_bool"), + true + ) + + checkAnswer( + sql("select str_bool from jsonTable where str_bool"), + false + ) + + // Right now, we have a parsing error. + // Number and String conflict: resolve the type as number in this query. + checkAnswer( + sql("select num_str + 1.2 from jsonTable where num_str > 92233720368547758060"), + BigDecimal("92233720368547758061.2") + ) + + // The plan of the following DSL is + // Project [(CAST(num_str#65:4, DoubleType) + 1.2) AS num#78] + // Filter (CAST(CAST(num_str#65:4, DoubleType), DecimalType) > 92233720368547758060) + // ExistingRdd [num_bool#61,num_num_1#62L,num_num_2#63,num_num_3#64,num_str#65,str_bool#66] + // We should directly cast num_str to DecimalType and also need to do the right type promotion + // in the Project. + checkAnswer( + jsonSchemaRDD. + where('num_str > BigDecimal("92233720368547758060")). + select('num_str + 1.2 as Symbol("num")), + BigDecimal("92233720368547758061.2") + ) + + // The following test will fail. The type of num_str is StringType. + // So, to evaluate num_str + 1.2, we first need to use Cast to convert the type. + // In our test data, one value of num_str is 13.1. + // The result of (CAST(num_str#65:4, DoubleType) + 1.2) for this value is 14.299999999999999, + // which is not 14.3. + // Number and String conflict: resolve the type as number in this query. + checkAnswer( + sql("select num_str + 1.2 from jsonTable where num_str > 13"), + Seq(14.3) :: Seq(92233720368547758071.2) :: Nil + ) + } + + test("Type conflict in complex field values") { + val jsonSchemaRDD = jsonRDD(complexFieldValueTypeConflict) + + val expectedSchema = + AttributeReference("array", ArrayType(IntegerType), true)() :: + AttributeReference("num_struct", StringType, true)() :: + AttributeReference("str_array", StringType, true)() :: + AttributeReference("struct", StructType( + StructField("field", StringType, true) :: Nil), true)() :: + AttributeReference("struct_array", StringType, true)() :: Nil + + comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output)) + + jsonSchemaRDD.registerAsTable("jsonTable") + + checkAnswer( + sql("select * from jsonTable"), + (Seq(), "11", "[1,2,3]", Seq(null), "[]") :: + (null, """{"field":false}""", null, null, "{}") :: + (Seq(4, 5, 6), null, "str", Seq(null), "[7,8,9]") :: + (Seq(7), "{}","[str1,str2,33]", Seq("str"), """{"field":true}""") :: Nil + ) + } + + test("Type conflict in array elements") { + val jsonSchemaRDD = jsonRDD(arrayElementTypeConflict) + + val expectedSchema = + AttributeReference("array", ArrayType(StringType), true)() :: Nil + + comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output)) + + jsonSchemaRDD.registerAsTable("jsonTable") + + checkAnswer( + sql("select * from jsonTable"), + Seq(Seq("1", "1.1", "true", null, "[]", "{}", "[2,3,4]", + """{"field":str}""")) :: Nil + ) + + // Treat an element as a number. + checkAnswer( + sql("select array[0] + 1 from jsonTable"), + 2 + ) + } + + test("Handling missing fields") { + val jsonSchemaRDD = jsonRDD(missingFields) + + val expectedSchema = + AttributeReference("a", BooleanType, true)() :: + AttributeReference("b", LongType, true)() :: + AttributeReference("c", ArrayType(IntegerType), true)() :: + AttributeReference("d", StructType( + StructField("field", BooleanType, true) :: Nil), true)() :: + AttributeReference("e", StringType, true)() :: Nil + + comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output)) + + jsonSchemaRDD.registerAsTable("jsonTable") + } + + test("Loading a JSON dataset from a text file") { + val file = getTempFilePath("json") + val path = file.toString + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + val jsonSchemaRDD = jsonFile(path) + + val expectedSchema = + AttributeReference("bigInteger", DecimalType, true)() :: + AttributeReference("boolean", BooleanType, true)() :: + AttributeReference("double", DoubleType, true)() :: + AttributeReference("integer", IntegerType, true)() :: + AttributeReference("long", LongType, true)() :: + AttributeReference("null", StringType, true)() :: + AttributeReference("string", StringType, true)() :: Nil + + comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output)) + + jsonSchemaRDD.registerAsTable("jsonTable") + + checkAnswer( + sql("select * from jsonTable"), + (BigDecimal("92233720368547758070"), + true, + 1.7976931348623157E308, + 10, + 21474836470L, + null, + "this is a simple string.") :: Nil + ) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala new file mode 100644 index 0000000000..065e04046e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.json + +import org.apache.spark.sql.test.TestSQLContext + +object TestJsonData { + + val primitiveFieldAndType = + TestSQLContext.sparkContext.parallelize( + """{"string":"this is a simple string.", + "integer":10, + "long":21474836470, + "bigInteger":92233720368547758070, + "double":1.7976931348623157E308, + "boolean":true, + "null":null + }""" :: Nil) + + val complexFieldAndType = + TestSQLContext.sparkContext.parallelize( + """{"struct":{"field1": true, "field2": 92233720368547758070}, + "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, + "arrayOfString":["str1", "str2"], + "arrayOfInteger":[1, 2147483647, -2147483648], + "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], + "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], + "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], + "arrayOfBoolean":[true, false, true], + "arrayOfNull":[null, null, null, null], + "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}], + "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], + "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] + }""" :: Nil) + + val primitiveFieldValueTypeConflict = + TestSQLContext.sparkContext.parallelize( + """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1, + "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" :: + """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null, + "num_bool":12, "num_str":null, "str_bool":true}""" :: + """{"num_num_1":21474836470, "num_num_2":92233720368547758070, "num_num_3": 100, + "num_bool":false, "num_str":"str1", "str_bool":false}""" :: + """{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470, + "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil) + + val complexFieldValueTypeConflict = + TestSQLContext.sparkContext.parallelize( + """{"num_struct":11, "str_array":[1, 2, 3], + "array":[], "struct_array":[], "struct": {}}""" :: + """{"num_struct":{"field":false}, "str_array":null, + "array":null, "struct_array":{}, "struct": null}""" :: + """{"num_struct":null, "str_array":"str", + "array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": {"field":null}}""" :: + """{"num_struct":{}, "str_array":["str1", "str2", 33], + "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil) + + val arrayElementTypeConflict = + TestSQLContext.sparkContext.parallelize( + """{"array": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}]}""" :: Nil) + + val missingFields = + TestSQLContext.sparkContext.parallelize( + """{"a":true}""" :: + """{"b":21474836470}""" :: + """{"c":[33, 44]}""" :: + """{"d":{"field":true}}""" :: + """{"e":"str"}""" :: Nil) +} -- cgit v1.2.3