aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-06-17 19:14:59 -0700
committerReynold Xin <rxin@apache.org>2014-06-17 19:14:59 -0700
commitd2f4f30b12f99358953e2781957468e2cfe3c916 (patch)
tree405b949a2968dba2c73874bd2fefc9d10206e731
parentb2ebf429e24566c29850c570f8d76943151ad78c (diff)
downloadspark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.gz
spark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.bz2
spark-d2f4f30b12f99358953e2781957468e2cfe3c916.zip
[SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-2060 Programming guide: http://yhuai.github.io/site/sql-programming-guide.html Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext Author: Yin Huai <huai@cse.ohio-state.edu> Closes #999 from yhuai/newJson and squashes the following commits: 227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ce8eedd [Yin Huai] rxin's comments. bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 94ffdaa [Yin Huai] Remove "get" from method names. ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 79ea9ba [Yin Huai] Fix typos. 5428451 [Yin Huai] Newline 1f908ce [Yin Huai] Remove extra line. d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 7ea750e [Yin Huai] marmbrus's comments. 6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 83013fb [Yin Huai] Update Java Example. e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map. 6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4fbddf0 [Yin Huai] Programming guide. 9df8c5a [Yin Huai] Python API. 7027634 [Yin Huai] Java API. cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset. d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ab810b0 [Yin Huai] Make JsonRDD private. 6df0891 [Yin Huai] Apache header. 8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema. 8ffed79 [Yin Huai] Update the example. a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution. 65b87f0 [Yin Huai] Fix sampling... 8846af5 [Yin Huai] API doc. 52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 0387523 [Yin Huai] Address PR comments. 666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson a2313a6 [Yin Huai] Address PR comments. f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used. 0576406 [Yin Huai] Add Apache license header. af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD. f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.
-rw-r--r--.rat-excludes1
-rw-r--r--docs/sql-programming-guide.md290
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java78
-rw-r--r--examples/src/main/resources/people.json3
-rw-r--r--project/SparkBuild.scala22
-rw-r--r--python/pyspark/sql.py64
-rw-r--r--sql/catalyst/pom.xml28
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala25
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala51
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala)9
-rw-r--r--sql/core/pom.xml12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala397
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala45
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala519
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala84
24 files changed, 1644 insertions, 111 deletions
diff --git a/.rat-excludes b/.rat-excludes
index 52b2dfac5c..15344dfb29 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -22,6 +22,7 @@ spark-env.sh.template
log4j-defaults.properties
sorttable.js
.*txt
+.*json
.*data
.*log
cloudpickle.py
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4623bb4247..522c83884e 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -17,20 +17,20 @@ Spark. At the core of this component is a new type of RDD,
[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects along with
a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table
in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io)
-file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
+file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.
</div>
<div data-lang="java" markdown="1">
-Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using
+Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using
Spark. At the core of this component is a new type of RDD,
[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed
[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along with
a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table
in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io)
-file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
+file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
</div>
<div data-lang="python" markdown="1">
@@ -41,7 +41,7 @@ Spark. At the core of this component is a new type of RDD,
[Row](api/python/pyspark.sql.Row-class.html) objects along with
a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table
in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io)
-file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
+file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.
</div>
@@ -64,8 +64,8 @@ descendants. To create a basic SQLContext, all you need is a SparkContext.
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
-import sqlContext._
+// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
+import sqlContext.createSchemaRDD
{% endhighlight %}
</div>
@@ -77,8 +77,8 @@ The entry point into all relational functionality in Spark is the
of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext.
{% highlight java %}
-JavaSparkContext ctx = ...; // An existing JavaSparkContext.
-JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx);
+JavaSparkContext sc = ...; // An existing JavaSparkContext.
+JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
{% endhighlight %}
</div>
@@ -91,14 +91,33 @@ of its decedents. To create a basic SQLContext, all you need is a SparkContext.
{% highlight python %}
from pyspark.sql import SQLContext
-sqlCtx = SQLContext(sc)
+sqlContext = SQLContext(sc)
{% endhighlight %}
</div>
</div>
-## Running SQL on RDDs
+# Data Sources
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface.
+Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources.
+</div>
+
+<div data-lang="java" markdown="1">
+Spark SQL supports operating on a variety of data sources through the `JavaSchemaRDD` interface.
+Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources.
+</div>
+
+<div data-lang="python" markdown="1">
+Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface.
+Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources.
+</div>
+</div>
+
+## RDDs
<div class="codetabs">
@@ -111,8 +130,10 @@ types such as Sequences or Arrays. This RDD can be implicitly converted to a Sch
registered as a table. Tables can be used in subsequent SQL statements.
{% highlight scala %}
+// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-import sqlContext._
+// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
+import sqlContext.createSchemaRDD
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
@@ -124,7 +145,7 @@ val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split("
people.registerAsTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
-val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
@@ -170,12 +191,11 @@ A schema can be applied to an existing RDD by calling `applySchema` and providin
for the JavaBean.
{% highlight java %}
-
-JavaSparkContext ctx = ...; // An existing JavaSparkContext.
-JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx)
+// sc is an existing JavaSparkContext.
+JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc)
// Load a text file and convert each line to a JavaBean.
-JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
+JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
@@ -189,11 +209,11 @@ JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").
});
// Apply a schema to an RDD of JavaBeans and register it as a table.
-JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
+JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
schemaPeople.registerAsTable("people");
// SQL can be run over RDDs that have been registered as tables.
-JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
@@ -215,6 +235,10 @@ row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as
can be used in subsequent SQL statements.
{% highlight python %}
+# sc is an existing SparkContext.
+from pyspark.sql import SQLContext
+sqlContext = SQLContext(sc)
+
# Load a text file and convert each line to a dictionary.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
@@ -223,14 +247,16 @@ people = parts.map(lambda p: {"name": p[0], "age": int(p[1])})
# Infer the schema, and register the SchemaRDD as a table.
# In future versions of PySpark we would like to add support for registering RDDs with other
# datatypes as tables
-peopleTable = sqlCtx.inferSchema(people)
-peopleTable.registerAsTable("people")
+schemaPeople = sqlContext.inferSchema(people)
+schemaPeople.registerAsTable("people")
# SQL can be run over SchemaRDDs that have been registered as a table.
-teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
+for teenName in teenNames.collect():
+ print teenName
{% endhighlight %}
</div>
@@ -241,7 +267,7 @@ teenNames = teenagers.map(lambda p: "Name: " + p.name)
Users that want a more complete dialect of SQL should look at the HiveQL support provided by
`HiveContext`.
-## Using Parquet
+## Parquet Files
[Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
@@ -252,22 +278,23 @@ of the original data. Using the data from the above example:
<div data-lang="scala" markdown="1">
{% highlight scala %}
-val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-import sqlContext._
+// sqlContext from the previous example is used in this example.
+// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
+import sqlContext.createSchemaRDD
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
-// The RDD is implicitly converted to a SchemaRDD, allowing it to be stored using Parquet.
+// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
-// The result of loading a Parquet file is also a JavaSchemaRDD.
+// The result of loading a Parquet file is also a SchemaRDD.
val parquetFile = sqlContext.parquetFile("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile")
-val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
-teenagers.collect().foreach(println)
+val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
+teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
{% endhighlight %}
</div>
@@ -275,6 +302,7 @@ teenagers.collect().foreach(println)
<div data-lang="java" markdown="1">
{% highlight java %}
+// sqlContext from the previous example is used in this example.
JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
@@ -283,13 +311,16 @@ schemaPeople.saveAsParquetFile("people.parquet");
// Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
-JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
+JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
-JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
-
-
+JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
+List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
+ public String call(Row row) {
+ return "Name: " + row.getString(0);
+ }
+}).collect();
{% endhighlight %}
</div>
@@ -297,50 +328,149 @@ JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >=
<div data-lang="python" markdown="1">
{% highlight python %}
+# sqlContext from the previous example is used in this example.
-peopleTable # The SchemaRDD from the previous example.
+schemaPeople # The SchemaRDD from the previous example.
# SchemaRDDs can be saved as Parquet files, maintaining the schema information.
-peopleTable.saveAsParquetFile("people.parquet")
+schemaPeople.saveAsParquetFile("people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a SchemaRDD.
-parquetFile = sqlCtx.parquetFile("people.parquet")
+parquetFile = sqlContext.parquetFile("people.parquet")
# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
-teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
-
+teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
+teenNames = teenagers.map(lambda p: "Name: " + p.name)
+for teenName in teenNames.collect():
+ print teenName
{% endhighlight %}
</div>
</div>
-## Writing Language-Integrated Relational Queries
+## JSON Datasets
+<div class="codetabs">
-**Language-Integrated queries are currently only supported in Scala.**
+<div data-lang="scala" markdown="1">
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD.
+This conversion can be done using one of two methods in a SQLContext:
-Spark SQL also supports a domain specific language for writing queries. Once again,
-using the data from the above examples:
+* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
+* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
{% highlight scala %}
+// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-import sqlContext._
-val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
-// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
-val teenagers = people.where('age >= 10).where('age <= 19).select('name)
+// A JSON dataset is pointed to by path.
+// The path can be either a single text file or a directory storing text files.
+val path = "examples/src/main/resources/people.json"
+// Create a SchemaRDD from the file(s) pointed to by path
+val people = sqlContext.jsonFile(path)
+
+// The inferred schema can be visualized using the printSchema() method.
+people.printSchema()
+// root
+// |-- age: IntegerType
+// |-- name: StringType
+
+// Register this SchemaRDD as a table.
+people.registerAsTable("people")
+
+// SQL statements can be run by using the sql methods provided by sqlContext.
+val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+// Alternatively, a SchemaRDD can be created for a JSON dataset represented by
+// an RDD[String] storing one JSON object per string.
+val anotherPeopleRDD = sc.parallelize(
+ """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
+val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
{% endhighlight %}
-The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers
-prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are
-evaluated by the SQL execution engine. A full list of the functions supported can be found in the
-[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD).
+</div>
-<!-- TODO: Include the table of operations here. -->
+<div data-lang="java" markdown="1">
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a JavaSchemaRDD.
+This conversion can be done using one of two methods in a JavaSQLContext :
-# Hive Support
+* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
+* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
+
+{% highlight java %}
+// sc is an existing JavaSparkContext.
+JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
+
+// A JSON dataset is pointed to by path.
+// The path can be either a single text file or a directory storing text files.
+String path = "examples/src/main/resources/people.json";
+// Create a JavaSchemaRDD from the file(s) pointed to by path
+JavaSchemaRDD people = sqlContext.jsonFile(path);
+
+// The inferred schema can be visualized using the printSchema() method.
+people.printSchema();
+// root
+// |-- age: IntegerType
+// |-- name: StringType
+
+// Register this JavaSchemaRDD as a table.
+people.registerAsTable("people");
+
+// SQL statements can be run by using the sql methods provided by sqlContext.
+JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
+
+// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
+// an RDD[String] storing one JSON object per string.
+List<String> jsonData = Arrays.asList(
+ "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
+JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
+JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD.
+This conversion can be done using one of two methods in a SQLContext:
+
+* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
+* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
+
+{% highlight python %}
+# sc is an existing SparkContext.
+from pyspark.sql import SQLContext
+sqlContext = SQLContext(sc)
+
+# A JSON dataset is pointed to by path.
+# The path can be either a single text file or a directory storing text files.
+path = "examples/src/main/resources/people.json"
+# Create a SchemaRDD from the file(s) pointed to by path
+people = sqlContext.jsonFile(path)
+
+# The inferred schema can be visualized using the printSchema() method.
+people.printSchema()
+# root
+# |-- age: IntegerType
+# |-- name: StringType
+
+# Register this SchemaRDD as a table.
+people.registerAsTable("people")
+
+# SQL statements can be run by using the sql methods provided by sqlContext.
+teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+# Alternatively, a SchemaRDD can be created for a JSON dataset represented by
+# an RDD[String] storing one JSON object per string.
+anotherPeopleRDD = sc.parallelize([
+ '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'])
+anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
+{% endhighlight %}
+</div>
+
+</div>
+
+## Hive Tables
Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/).
However, since Hive has a large number of dependencies, it is not included in the default Spark assembly.
@@ -362,17 +492,14 @@ which is similar to `HiveContext`, but creates a local copy of the `metastore` a
automatically.
{% highlight scala %}
-val sc: SparkContext // An existing SparkContext.
+// sc is an existing SparkContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
-// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
-import hiveContext._
-
-hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
-hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
+hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
-hql("FROM src SELECT key, value").collect().foreach(println)
+hiveContext.hql("FROM src SELECT key, value").collect().foreach(println)
{% endhighlight %}
</div>
@@ -385,14 +512,14 @@ the `sql` method a `JavaHiveContext` also provides an `hql` methods, which allow
expressed in HiveQL.
{% highlight java %}
-JavaSparkContext ctx = ...; // An existing JavaSparkContext.
-JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.HiveContext(ctx);
+// sc is an existing JavaSparkContext.
+JavaHiveContext hiveContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc);
-hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
-hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
+hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
+hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL.
-Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect();
+Row[] results = hiveContext.hql("FROM src SELECT key, value").collect();
{% endhighlight %}
@@ -406,17 +533,44 @@ the `sql` method a `HiveContext` also provides an `hql` methods, which allows qu
expressed in HiveQL.
{% highlight python %}
-
+# sc is an existing SparkContext.
from pyspark.sql import HiveContext
-hiveCtx = HiveContext(sc)
+hiveContext = HiveContext(sc)
-hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
-hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
+hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
-results = hiveCtx.hql("FROM src SELECT key, value").collect()
+results = hiveContext.hql("FROM src SELECT key, value").collect()
{% endhighlight %}
</div>
</div>
+
+
+# Writing Language-Integrated Relational Queries
+
+**Language-Integrated queries are currently only supported in Scala.**
+
+Spark SQL also supports a domain specific language for writing queries. Once again,
+using the data from the above examples:
+
+{% highlight scala %}
+// sc is an existing SparkContext.
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
+import sqlContext._
+val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
+
+// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
+val teenagers = people.where('age >= 10).where('age <= 19).select('name)
+teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
+{% endhighlight %}
+
+The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers
+prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are
+evaluated by the SQL execution engine. A full list of the functions supported can be found in the
+[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD).
+
+<!-- TODO: Include the table of operations here. --> \ No newline at end of file
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index ad5ec84b71..607df3eddd 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples.sql;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
@@ -56,6 +57,7 @@ public class JavaSparkSQL {
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
+ System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
@@ -84,16 +86,88 @@ public class JavaSparkSQL {
return "Name: " + row.getString(0);
}
}).collect();
+ for (String name: teenagerNames) {
+ System.out.println(name);
+ }
+ System.out.println("=== Data source: Parquet File ===");
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");
- // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
+ // Read in the parquet file created above.
+ // Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
- JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
+ JavaSchemaRDD teenagers2 =
+ sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
+ teenagerNames = teenagers2.map(new Function<Row, String>() {
+ public String call(Row row) {
+ return "Name: " + row.getString(0);
+ }
+ }).collect();
+ for (String name: teenagerNames) {
+ System.out.println(name);
+ }
+
+ System.out.println("=== Data source: JSON Dataset ===");
+ // A JSON dataset is pointed by path.
+ // The path can be either a single text file or a directory storing text files.
+ String path = "examples/src/main/resources/people.json";
+ // Create a JavaSchemaRDD from the file(s) pointed by path
+ JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
+
+ // Because the schema of a JSON dataset is automatically inferred, to write queries,
+ // it is better to take a look at what is the schema.
+ peopleFromJsonFile.printSchema();
+ // The schema of people is ...
+ // root
+ // |-- age: IntegerType
+ // |-- name: StringType
+
+ // Register this JavaSchemaRDD as a table.
+ peopleFromJsonFile.registerAsTable("people");
+
+ // SQL statements can be run by using the sql methods provided by sqlCtx.
+ JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
+
+ // The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
+ // The columns of a row in the result can be accessed by ordinal.
+ teenagerNames = teenagers3.map(new Function<Row, String>() {
+ public String call(Row row) { return "Name: " + row.getString(0); }
+ }).collect();
+ for (String name: teenagerNames) {
+ System.out.println(name);
+ }
+
+ // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
+ // a RDD[String] storing one JSON object per string.
+ List<String> jsonData = Arrays.asList(
+ "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
+ JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
+ JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD);
+
+ // Take a look at the schema of this new JavaSchemaRDD.
+ peopleFromJsonRDD.printSchema();
+ // The schema of anotherPeople is ...
+ // root
+ // |-- address: StructType
+ // | |-- city: StringType
+ // | |-- state: StringType
+ // |-- name: StringType
+
+ peopleFromJsonRDD.registerAsTable("people2");
+
+ JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
+ List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
+ public String call(Row row) {
+ return "Name: " + row.getString(0) + ", City: " + row.getString(1);
+ }
+ }).collect();
+ for (String name: nameAndCity) {
+ System.out.println(name);
+ }
}
}
diff --git a/examples/src/main/resources/people.json b/examples/src/main/resources/people.json
new file mode 100644
index 0000000000..50a859cbd7
--- /dev/null
+++ b/examples/src/main/resources/people.json
@@ -0,0 +1,3 @@
+{"name":"Michael"}
+{"name":"Andy", "age":30}
+{"name":"Justin", "age":19}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 2d60a44f04..7bb39dc771 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -76,7 +76,7 @@ object SparkBuild extends Build {
lazy val catalyst = Project("catalyst", file("sql/catalyst"), settings = catalystSettings) dependsOn(core)
- lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core, catalyst)
+ lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core) dependsOn(catalyst % "compile->compile;test->test")
lazy val hive = Project("hive", file("sql/hive"), settings = hiveSettings) dependsOn(sql)
@@ -501,9 +501,23 @@ object SparkBuild extends Build {
def sqlCoreSettings = sharedSettings ++ Seq(
name := "spark-sql",
libraryDependencies ++= Seq(
- "com.twitter" % "parquet-column" % parquetVersion,
- "com.twitter" % "parquet-hadoop" % parquetVersion
- )
+ "com.twitter" % "parquet-column" % parquetVersion,
+ "com.twitter" % "parquet-hadoop" % parquetVersion,
+ "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.0" // json4s-jackson 3.2.6 requires jackson-databind 2.3.0.
+ ),
+ initialCommands in console :=
+ """
+ |import org.apache.spark.sql.catalyst.analysis._
+ |import org.apache.spark.sql.catalyst.dsl._
+ |import org.apache.spark.sql.catalyst.errors._
+ |import org.apache.spark.sql.catalyst.expressions._
+ |import org.apache.spark.sql.catalyst.plans.logical._
+ |import org.apache.spark.sql.catalyst.rules._
+ |import org.apache.spark.sql.catalyst.types._
+ |import org.apache.spark.sql.catalyst.util._
+ |import org.apache.spark.sql.execution
+ |import org.apache.spark.sql.test.TestSQLContext._
+ |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
)
// Since we don't include hive in the main assembly this project also acts as an alternative
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index c31d49ce83..5051c82da3 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from pyspark.rdd import RDD
+from pyspark.rdd import RDD, PipelinedRDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
from py4j.protocol import Py4JError
@@ -137,6 +137,53 @@ class SQLContext:
jschema_rdd = self._ssql_ctx.parquetFile(path)
return SchemaRDD(jschema_rdd, self)
+
+ def jsonFile(self, path):
+ """Loads a text file storing one JSON object per line,
+ returning the result as a L{SchemaRDD}.
+ It goes through the entire dataset once to determine the schema.
+
+ >>> import tempfile, shutil
+ >>> jsonFile = tempfile.mkdtemp()
+ >>> shutil.rmtree(jsonFile)
+ >>> ofn = open(jsonFile, 'w')
+ >>> for json in jsonStrings:
+ ... print>>ofn, json
+ >>> ofn.close()
+ >>> srdd = sqlCtx.jsonFile(jsonFile)
+ >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+ >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
+ >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
+ ... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
+ ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
+ True
+ """
+ jschema_rdd = self._ssql_ctx.jsonFile(path)
+ return SchemaRDD(jschema_rdd, self)
+
+ def jsonRDD(self, rdd):
+ """Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.
+ It goes through the entire dataset once to determine the schema.
+
+ >>> srdd = sqlCtx.jsonRDD(json)
+ >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+ >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
+ >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
+ ... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
+ ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
+ True
+ """
+ def func(split, iterator):
+ for x in iterator:
+ if not isinstance(x, basestring):
+ x = unicode(x)
+ yield x.encode("utf-8")
+ keyed = PipelinedRDD(rdd, func)
+ keyed._bypass_serializer = True
+ jrdd = keyed._jrdd.map(self._jvm.BytesToString())
+ jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd())
+ return SchemaRDD(jschema_rdd, self)
+
def sql(self, sqlQuery):
"""Return a L{SchemaRDD} representing the result of the given query.
@@ -265,7 +312,7 @@ class SchemaRDD(RDD):
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the
L{SchemaRDD} is not operated on directly, as it's underlying
- implementation is a RDD composed of Java objects. Instead it is
+ implementation is an RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can
be done.
"""
@@ -337,6 +384,14 @@ class SchemaRDD(RDD):
"""Creates a new table with the contents of this SchemaRDD."""
self._jschema_rdd.saveAsTable(tableName)
+ def schemaString(self):
+ """Returns the output schema in the tree format."""
+ return self._jschema_rdd.schemaString()
+
+ def printSchema(self):
+ """Prints out the schema in the tree format."""
+ print self.schemaString()
+
def count(self):
"""Return the number of elements in this RDD.
@@ -436,6 +491,11 @@ def _test():
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
+ jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
+ '{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
+ '{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
+ globs['jsonStrings'] = jsonStrings
+ globs['json'] = sc.parallelize(jsonStrings)
globs['nestedRdd1'] = sc.parallelize([
{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
{"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 6c78c34486..01d7b56908 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -66,6 +66,34 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
+
+ <!--
+ This plugin forces the generation of jar containing catalyst test classes,
+ so that the tests classes of external modules can use them. The two execution profiles
+ are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally,
+ 'mvn compile' should not compile test classes and therefore should not need this.
+ However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559)
+ causes the compilation to fail if catalyst test-jar is not generated. Hence, the
+ second execution profile for 'mvn compile'.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>test-jar-on-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index d291814c8a..66bff660ca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -22,6 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types._
+object HiveTypeCoercion {
+ // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
+ // The conversion for integral and floating point types have a linear widening hierarchy:
+ val numericPrecedence =
+ Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
+ // Boolean is only wider than Void
+ val booleanPrecedence = Seq(NullType, BooleanType)
+ val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
+}
+
/**
* A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that
* participate in operations into compatible ones. Most of these rules are based on Hive semantics,
@@ -116,19 +126,18 @@ trait HiveTypeCoercion {
*
* Additionally, all types when UNION-ed with strings will be promoted to strings.
* Other string conversions are handled by PromoteStrings.
+ *
+ * Widening types might result in loss of precision in the following cases:
+ * - IntegerType to FloatType
+ * - LongType to FloatType
+ * - LongType to DoubleType
*/
object WidenTypes extends Rule[LogicalPlan] {
- // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
- // The conversion for integral and floating point types have a linear widening hierarchy:
- val numericPrecedence =
- Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
- // Boolean is only wider than Void
- val booleanPrecedence = Seq(NullType, BooleanType)
- val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
// Try and find a promotion rule that contains both types in question.
- val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2))
+ val applicableConversion =
+ HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
// If found return the widest common type, otherwise None
applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 8199a80f5d..00e2d3bc24 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.catalyst.types.{ArrayType, DataType, StructField, StructType}
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
self: PlanType with Product =>
@@ -123,4 +125,53 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
case other => Nil
}.toSeq
}
+
+ protected def generateSchemaString(schema: Seq[Attribute]): String = {
+ val builder = new StringBuilder
+ builder.append("root\n")
+ val prefix = " |"
+ schema.foreach { attribute =>
+ val name = attribute.name
+ val dataType = attribute.dataType
+ dataType match {
+ case fields: StructType =>
+ builder.append(s"$prefix-- $name: $StructType\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case ArrayType(fields: StructType) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case ArrayType(elementType: DataType) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
+ case _ => builder.append(s"$prefix-- $name: $dataType\n")
+ }
+ }
+
+ builder.toString()
+ }
+
+ protected def generateSchemaString(
+ schema: StructType,
+ prefix: String,
+ builder: StringBuilder): StringBuilder = {
+ schema.fields.foreach {
+ case StructField(name, fields: StructType, _) =>
+ builder.append(s"$prefix-- $name: $StructType\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case StructField(name, ArrayType(fields: StructType), _) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
+ generateSchemaString(fields, s"$prefix |", builder)
+ case StructField(name, ArrayType(elementType: DataType), _) =>
+ builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
+ case StructField(name, fieldType: DataType, _) =>
+ builder.append(s"$prefix-- $name: $fieldType\n")
+ }
+
+ builder
+ }
+
+ /** Returns the output schema in the tree format. */
+ def schemaString: String = generateSchemaString(output)
+
+ /** Prints out the schema in the tree format */
+ def printSchema(): Unit = println(schemaString)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
index 714f01843c..4896f1b955 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
@@ -18,11 +18,12 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class CombiningLimitsSuite extends OptimizerTest {
+class CombiningLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 6efc0e211e..cea97c584f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types._
@@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class ConstantFoldingSuite extends OptimizerTest {
+class ConstantFoldingSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 1f67c80e54..ebb123c1f9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.LeftOuter
-import org.apache.spark.sql.catalyst.plans.RightOuter
+import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
-class FilterPushdownSuite extends OptimizerTest {
+class FilterPushdownSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
index df1409fe7b..22992fb6f5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules._
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-class SimplifyCaseConversionExpressionsSuite extends OptimizerTest {
+class SimplifyCaseConversionExpressionsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 89982d5cd8..7e9f47ef21 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.optimizer
+package org.apache.spark.sql.catalyst.plans
import org.scalatest.FunSuite
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
/**
- * Provides helper methods for comparing plans produced by optimization rules with the expected
- * result
+ * Provides helper methods for comparing plans.
*/
-class OptimizerTest extends FunSuite {
+class PlanTest extends FunSuite {
/**
* Since attribute references are given globally unique ids during analysis,
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index e65ca6be48..8210fd1f21 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -44,6 +44,13 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
@@ -54,6 +61,11 @@
<version>${parquet.version}</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.3.0</version>
+ </dependency>
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 131c130bbb..f7e03323be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -22,24 +22,22 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
-
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
+import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-
import org.apache.spark.sql.columnar.InMemoryRelation
-
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
-
+import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.SparkContext
/**
* :: AlphaComponent ::
@@ -53,7 +51,7 @@ import org.apache.spark.sql.parquet.ParquetRelation
class SQLContext(@transient val sparkContext: SparkContext)
extends Logging
with SQLConf
- with dsl.ExpressionConversions
+ with ExpressionConversions
with Serializable {
self =>
@@ -99,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, parquet.ParquetRelation(path))
/**
+ * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group userf
+ */
+ def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0)
+
+ /**
+ * :: Experimental ::
+ */
+ @Experimental
+ def jsonFile(path: String, samplingRatio: Double): SchemaRDD = {
+ val json = sparkContext.textFile(path)
+ jsonRDD(json, samplingRatio)
+ }
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+ * [[SchemaRDD]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group userf
+ */
+ def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0)
+
+ /**
+ * :: Experimental ::
+ */
+ @Experimental
+ def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
+ new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
+
+ /**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
* This registered table can be used as the target of future `insertInto` operations.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 89eaba2d19..7c0efb4566 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.catalyst.types.BooleanType
+import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType}
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD
import java.util.{Map => JMap}
@@ -41,8 +41,10 @@ import java.util.{Map => JMap}
* whose elements are scala case classes into a SchemaRDD. This conversion can also be done
* explicitly using the `createSchemaRDD` function on a [[SQLContext]].
*
- * A `SchemaRDD` can also be created by loading data in from external sources, for example,
- * by using the `parquetFile` method on [[SQLContext]].
+ * A `SchemaRDD` can also be created by loading data in from external sources.
+ * Examples are loading data from Parquet files by using by using the
+ * `parquetFile` method on [[SQLContext]], and loading JSON datasets
+ * by using `jsonFile` and `jsonRDD` methods on [[SQLContext]].
*
* == SQL Queries ==
* A SchemaRDD can be registered as a table in the [[SQLContext]] that was used to create it. Once
@@ -341,14 +343,38 @@ class SchemaRDD(
*/
def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
+ /**
+ * Converts a JavaRDD to a PythonRDD. It is used by pyspark.
+ */
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
- val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
+ def rowToMap(row: Row, structType: StructType): JMap[String, Any] = {
+ val fields = structType.fields.map(field => (field.name, field.dataType))
+ val map: JMap[String, Any] = new java.util.HashMap
+ row.zip(fields).foreach {
+ case (obj, (name, dataType)) =>
+ dataType match {
+ case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
+ case other => map.put(name, obj)
+ }
+ }
+
+ map
+ }
+
+ // TODO: Actually, the schema of a row should be represented by a StructType instead of
+ // a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to
+ // construct the Map for python.
+ val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map(
+ field => (field.name, field.dataType))
this.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
val map: JMap[String, Any] = new java.util.HashMap
- row.zip(fieldNames).foreach { case (obj, name) =>
- map.put(name, obj)
+ row.zip(fields).foreach { case (obj, (name, dataType)) =>
+ dataType match {
+ case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
+ case other => map.put(name, obj)
+ }
}
map
}.grouped(10).map(batched => pickle.dumps(batched.toArray))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 656be965a8..fe81721943 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -122,4 +122,10 @@ private[sql] trait SchemaRDDLike {
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
+
+ /** Returns the output schema in the tree format. */
+ def schemaString: String = queryExecution.analyzed.schemaString
+
+ /** Prints out the schema in the tree format. */
+ def printSchema(): Unit = println(schemaString)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 352260fa15..ff9842267f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
import org.apache.spark.sql.catalyst.types._
@@ -101,6 +102,25 @@ class JavaSQLContext(val sqlContext: SQLContext) {
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
/**
+ * Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group userf
+ */
+ def jsonFile(path: String): JavaSchemaRDD =
+ jsonRDD(sqlContext.sparkContext.textFile(path))
+
+ /**
+ * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+ * [[JavaSchemaRDD]].
+ * It goes through the entire dataset once to determine the schema.
+ *
+ * @group userf
+ */
+ def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
+ new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
+
+ /**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
new file mode 100644
index 0000000000..edf8677557
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.json
+
+import scala.collection.JavaConversions._
+import scala.math.BigDecimal
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.sql.Logging
+
+private[sql] object JsonRDD extends Logging {
+
+ private[sql] def inferSchema(
+ json: RDD[String],
+ samplingRatio: Double = 1.0): LogicalPlan = {
+ require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
+ val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
+ val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
+ val baseSchema = createSchema(allKeys)
+
+ createLogicalPlan(json, baseSchema)
+ }
+
+ private def createLogicalPlan(
+ json: RDD[String],
+ baseSchema: StructType): LogicalPlan = {
+ val schema = nullTypeToStringType(baseSchema)
+
+ SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))
+ }
+
+ private def createSchema(allKeys: Set[(String, DataType)]): StructType = {
+ // Resolve type conflicts
+ val resolved = allKeys.groupBy {
+ case (key, dataType) => key
+ }.map {
+ // Now, keys and types are organized in the format of
+ // key -> Set(type1, type2, ...).
+ case (key, typeSet) => {
+ val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq
+ val dataType = typeSet.map {
+ case (_, dataType) => dataType
+ }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
+
+ (fieldName, dataType)
+ }
+ }
+
+ def makeStruct(values: Seq[Seq[String]], prefix: Seq[String]): StructType = {
+ val (topLevel, structLike) = values.partition(_.size == 1)
+ val topLevelFields = topLevel.filter {
+ name => resolved.get(prefix ++ name).get match {
+ case ArrayType(StructType(Nil)) => false
+ case ArrayType(_) => true
+ case struct: StructType => false
+ case _ => true
+ }
+ }.map {
+ a => StructField(a.head, resolved.get(prefix ++ a).get, nullable = true)
+ }
+
+ val structFields: Seq[StructField] = structLike.groupBy(_(0)).map {
+ case (name, fields) => {
+ val nestedFields = fields.map(_.tail)
+ val structType = makeStruct(nestedFields, prefix :+ name)
+ val dataType = resolved.get(prefix :+ name).get
+ dataType match {
+ case array: ArrayType => Some(StructField(name, ArrayType(structType), nullable = true))
+ case struct: StructType => Some(StructField(name, structType, nullable = true))
+ // dataType is StringType means that we have resolved type conflicts involving
+ // primitive types and complex types. So, the type of name has been relaxed to
+ // StringType. Also, this field should have already been put in topLevelFields.
+ case StringType => None
+ }
+ }
+ }.flatMap(field => field).toSeq
+
+ StructType(
+ (topLevelFields ++ structFields).sortBy {
+ case StructField(name, _, _) => name
+ })
+ }
+
+ makeStruct(resolved.keySet.toSeq, Nil)
+ }
+
+ /**
+ * Returns the most general data type for two given data types.
+ */
+ private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
+ // Try and find a promotion rule that contains both types in question.
+ val applicableConversion = HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p
+ .contains(t2))
+
+ // If found return the widest common type, otherwise None
+ val returnType = applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
+
+ if (returnType.isDefined) {
+ returnType.get
+ } else {
+ // t1 or t2 is a StructType, ArrayType, or an unexpected type.
+ (t1, t2) match {
+ case (other: DataType, NullType) => other
+ case (NullType, other: DataType) => other
+ case (StructType(fields1), StructType(fields2)) => {
+ val newFields = (fields1 ++ fields2).groupBy(field => field.name).map {
+ case (name, fieldTypes) => {
+ val dataType = fieldTypes.map(field => field.dataType).reduce(
+ (type1: DataType, type2: DataType) => compatibleType(type1, type2))
+ StructField(name, dataType, true)
+ }
+ }
+ StructType(newFields.toSeq.sortBy {
+ case StructField(name, _, _) => name
+ })
+ }
+ case (ArrayType(elementType1), ArrayType(elementType2)) =>
+ ArrayType(compatibleType(elementType1, elementType2))
+ // TODO: We should use JsonObjectStringType to mark that values of field will be
+ // strings and every string is a Json object.
+ case (_, _) => StringType
+ }
+ }
+ }
+
+ private def typeOfPrimitiveValue(value: Any): DataType = {
+ value match {
+ case value: java.lang.String => StringType
+ case value: java.lang.Integer => IntegerType
+ case value: java.lang.Long => LongType
+ // Since we do not have a data type backed by BigInteger,
+ // when we see a Java BigInteger, we use DecimalType.
+ case value: java.math.BigInteger => DecimalType
+ case value: java.lang.Double => DoubleType
+ case value: java.math.BigDecimal => DecimalType
+ case value: java.lang.Boolean => BooleanType
+ case null => NullType
+ // Unexpected data type.
+ case _ => StringType
+ }
+ }
+
+ /**
+ * Returns the element type of an JSON array. We go through all elements of this array
+ * to detect any possible type conflict. We use [[compatibleType]] to resolve
+ * type conflicts. Right now, when the element of an array is another array, we
+ * treat the element as String.
+ */
+ private def typeOfArray(l: Seq[Any]): ArrayType = {
+ val elements = l.flatMap(v => Option(v))
+ if (elements.isEmpty) {
+ // If this JSON array is empty, we use NullType as a placeholder.
+ // If this array is not empty in other JSON objects, we can resolve
+ // the type after we have passed through all JSON objects.
+ ArrayType(NullType)
+ } else {
+ val elementType = elements.map {
+ e => e match {
+ case map: Map[_, _] => StructType(Nil)
+ // We have an array of arrays. If those element arrays do not have the same
+ // element types, we will return ArrayType[StringType].
+ case seq: Seq[_] => typeOfArray(seq)
+ case value => typeOfPrimitiveValue(value)
+ }
+ }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
+
+ ArrayType(elementType)
+ }
+ }
+
+ /**
+ * Figures out all key names and data types of values from a parsed JSON object
+ * (in the format of Map[Stirng, Any]). When the value of a key is an JSON object, we
+ * only use a placeholder (StructType(Nil)) to mark that it should be a struct
+ * instead of getting all fields of this struct because a field does not appear
+ * in this JSON object can appear in other JSON objects.
+ */
+ private def allKeysWithValueTypes(m: Map[String, Any]): Set[(String, DataType)] = {
+ m.map{
+ // Quote the key with backticks to handle cases which have dots
+ // in the field name.
+ case (key, dataType) => (s"`$key`", dataType)
+ }.flatMap {
+ case (key: String, struct: Map[String, Any]) => {
+ // The value associted with the key is an JSON object.
+ allKeysWithValueTypes(struct).map {
+ case (k, dataType) => (s"$key.$k", dataType)
+ } ++ Set((key, StructType(Nil)))
+ }
+ case (key: String, array: List[Any]) => {
+ // The value associted with the key is an array.
+ typeOfArray(array) match {
+ case ArrayType(StructType(Nil)) => {
+ // The elements of this arrays are structs.
+ array.asInstanceOf[List[Map[String, Any]]].flatMap {
+ element => allKeysWithValueTypes(element)
+ }.map {
+ case (k, dataType) => (s"$key.$k", dataType)
+ } :+ (key, ArrayType(StructType(Nil)))
+ }
+ case ArrayType(elementType) => (key, ArrayType(elementType)) :: Nil
+ }
+ }
+ case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
+ }.toSet
+ }
+
+ /**
+ * Converts a Java Map/List to a Scala Map/List.
+ * We do not use Jackson's scala module at here because
+ * DefaultScalaModule in jackson-module-scala will make
+ * the parsing very slow.
+ */
+ private def scalafy(obj: Any): Any = obj match {
+ case map: java.util.Map[String, Object] =>
+ // .map(identity) is used as a workaround of non-serializable Map
+ // generated by .mapValues.
+ // This issue is documented at https://issues.scala-lang.org/browse/SI-7005
+ map.toMap.mapValues(scalafy).map(identity)
+ case list: java.util.List[Object] =>
+ list.toList.map(scalafy)
+ case atom => atom
+ }
+
+ private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = {
+ // According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72],
+ // ObjectMapper will not return BigDecimal when
+ // "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled
+ // (see NumberDeserializer.deserialize for the logic).
+ // But, we do not want to enable this feature because it will use BigDecimal
+ // for every float number, which will be slow.
+ // So, right now, we will have Infinity for those BigDecimal number.
+ // TODO: Support BigDecimal.
+ json.mapPartitions(iter => {
+ // When there is a key appearing multiple times (a duplicate key),
+ // the ObjectMapper will take the last value associated with this duplicate key.
+ // For example: for {"key": 1, "key":2}, we will get "key"->2.
+ val mapper = new ObjectMapper()
+ iter.map(record => mapper.readValue(record, classOf[java.util.Map[String, Any]]))
+ }).map(scalafy).map(_.asInstanceOf[Map[String, Any]])
+ }
+
+ private def toLong(value: Any): Long = {
+ value match {
+ case value: java.lang.Integer => value.asInstanceOf[Int].toLong
+ case value: java.lang.Long => value.asInstanceOf[Long]
+ }
+ }
+
+ private def toDouble(value: Any): Double = {
+ value match {
+ case value: java.lang.Integer => value.asInstanceOf[Int].toDouble
+ case value: java.lang.Long => value.asInstanceOf[Long].toDouble
+ case value: java.lang.Double => value.asInstanceOf[Double]
+ }
+ }
+
+ private def toDecimal(value: Any): BigDecimal = {
+ value match {
+ case value: java.lang.Integer => BigDecimal(value)
+ case value: java.lang.Long => BigDecimal(value)
+ case value: java.math.BigInteger => BigDecimal(value)
+ case value: java.lang.Double => BigDecimal(value)
+ case value: java.math.BigDecimal => BigDecimal(value)
+ }
+ }
+
+ private def toJsonArrayString(seq: Seq[Any]): String = {
+ val builder = new StringBuilder
+ builder.append("[")
+ var count = 0
+ seq.foreach {
+ element =>
+ if (count > 0) builder.append(",")
+ count += 1
+ builder.append(toString(element))
+ }
+ builder.append("]")
+
+ builder.toString()
+ }
+
+ private def toJsonObjectString(map: Map[String, Any]): String = {
+ val builder = new StringBuilder
+ builder.append("{")
+ var count = 0
+ map.foreach {
+ case (key, value) =>
+ if (count > 0) builder.append(",")
+ count += 1
+ builder.append(s"""\"${key}\":${toString(value)}""")
+ }
+ builder.append("}")
+
+ builder.toString()
+ }
+
+ private def toString(value: Any): String = {
+ value match {
+ case value: Map[String, Any] => toJsonObjectString(value)
+ case value: Seq[Any] => toJsonArrayString(value)
+ case value => Option(value).map(_.toString).orNull
+ }
+ }
+
+ private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={
+ if (value == null) {
+ null
+ } else {
+ desiredType match {
+ case ArrayType(elementType) =>
+ value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
+ case StringType => toString(value)
+ case IntegerType => value.asInstanceOf[IntegerType.JvmType]
+ case LongType => toLong(value)
+ case DoubleType => toDouble(value)
+ case DecimalType => toDecimal(value)
+ case BooleanType => value.asInstanceOf[BooleanType.JvmType]
+ case NullType => null
+ }
+ }
+ }
+
+ private def asRow(json: Map[String,Any], schema: StructType): Row = {
+ val row = new GenericMutableRow(schema.fields.length)
+ schema.fields.zipWithIndex.foreach {
+ // StructType
+ case (StructField(name, fields: StructType, _), i) =>
+ row.update(i, json.get(name).flatMap(v => Option(v)).map(
+ v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
+
+ // ArrayType(StructType)
+ case (StructField(name, ArrayType(structType: StructType), _), i) =>
+ row.update(i,
+ json.get(name).flatMap(v => Option(v)).map(
+ v => v.asInstanceOf[Seq[Any]].map(
+ e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull)
+
+ // Other cases
+ case (StructField(name, dataType, _), i) =>
+ row.update(i, json.get(name).flatMap(v => Option(v)).map(
+ enforceCorrectType(_, dataType)).getOrElse(null))
+ }
+
+ row
+ }
+
+ private def nullTypeToStringType(struct: StructType): StructType = {
+ val fields = struct.fields.map {
+ case StructField(fieldName, dataType, nullable) => {
+ val newType = dataType match {
+ case NullType => StringType
+ case ArrayType(NullType) => ArrayType(StringType)
+ case struct: StructType => nullTypeToStringType(struct)
+ case other: DataType => other
+ }
+ StructField(fieldName, newType, nullable)
+ }
+ }
+
+ StructType(fields)
+ }
+
+ private def asAttributes(struct: StructType): Seq[AttributeReference] = {
+ struct.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
+ }
+
+ private def asStruct(attributes: Seq[AttributeReference]): StructType = {
+ val fields = attributes.map {
+ case AttributeReference(name, dataType, nullable) => StructField(name, dataType, nullable)
+ }
+
+ StructType(fields)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index d7f6abaf5d..ef84ead2e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -17,12 +17,10 @@
package org.apache.spark.sql
-import org.scalatest.FunSuite
-
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
-class QueryTest extends FunSuite {
+class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[SchemaRDD]] to be executed
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
index 9fff7222fe..020baf0c7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
@@ -22,6 +22,7 @@ import scala.beans.BeanProperty
import org.scalatest.FunSuite
import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.TestSQLContext
// Implicits
@@ -111,4 +112,48 @@ class JavaSQLSuite extends FunSuite {
""".stripMargin).collect.head.row ===
Seq.fill(8)(null))
}
+
+ test("loads JSON datasets") {
+ val jsonString =
+ """{"string":"this is a simple string.",
+ "integer":10,
+ "long":21474836470,
+ "bigInteger":92233720368547758070,
+ "double":1.7976931348623157E308,
+ "boolean":true,
+ "null":null
+ }""".replaceAll("\n", " ")
+ val rdd = javaCtx.parallelize(jsonString :: Nil)
+
+ var schemaRDD = javaSqlCtx.jsonRDD(rdd)
+
+ schemaRDD.registerAsTable("jsonTable1")
+
+ assert(
+ javaSqlCtx.sql("select * from jsonTable1").collect.head.row ===
+ Seq(BigDecimal("92233720368547758070"),
+ true,
+ 1.7976931348623157E308,
+ 10,
+ 21474836470L,
+ null,
+ "this is a simple string."))
+
+ val file = getTempFilePath("json")
+ val path = file.toString
+ rdd.saveAsTextFile(path)
+ schemaRDD = javaSqlCtx.jsonFile(path)
+
+ schemaRDD.registerAsTable("jsonTable2")
+
+ assert(
+ javaSqlCtx.sql("select * from jsonTable2").collect.head.row ===
+ Seq(BigDecimal("92233720368547758070"),
+ true,
+ 1.7976931348623157E308,
+ 10,
+ 21474836470L,
+ null,
+ "this is a simple string."))
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
new file mode 100644
index 0000000000..10bd9f08f0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.json
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType}
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.TestSQLContext._
+
+protected case class Schema(output: Seq[Attribute]) extends LeafNode
+
+class JsonSuite extends QueryTest {
+ import TestJsonData._
+ TestJsonData
+
+ test("Type promotion") {
+ def checkTypePromotion(expected: Any, actual: Any) {
+ assert(expected.getClass == actual.getClass,
+ s"Failed to promote ${actual.getClass} to ${expected.getClass}.")
+ assert(expected == actual,
+ s"Promoted value ${actual}(${actual.getClass}) does not equal the expected value " +
+ s"${expected}(${expected.getClass}).")
+ }
+
+ val intNumber: Int = 2147483647
+ checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType))
+ checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType))
+ checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType))
+ checkTypePromotion(BigDecimal(intNumber), enforceCorrectType(intNumber, DecimalType))
+
+ val longNumber: Long = 9223372036854775807L
+ checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType))
+ checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType))
+ checkTypePromotion(BigDecimal(longNumber), enforceCorrectType(longNumber, DecimalType))
+
+ val doubleNumber: Double = 1.7976931348623157E308d
+ checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType))
+ checkTypePromotion(BigDecimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType))
+ }
+
+ test("Get compatible type") {
+ def checkDataType(t1: DataType, t2: DataType, expected: DataType) {
+ var actual = compatibleType(t1, t2)
+ assert(actual == expected,
+ s"Expected $expected as the most general data type for $t1 and $t2, found $actual")
+ actual = compatibleType(t2, t1)
+ assert(actual == expected,
+ s"Expected $expected as the most general data type for $t1 and $t2, found $actual")
+ }
+
+ // NullType
+ checkDataType(NullType, BooleanType, BooleanType)
+ checkDataType(NullType, IntegerType, IntegerType)
+ checkDataType(NullType, LongType, LongType)
+ checkDataType(NullType, DoubleType, DoubleType)
+ checkDataType(NullType, DecimalType, DecimalType)
+ checkDataType(NullType, StringType, StringType)
+ checkDataType(NullType, ArrayType(IntegerType), ArrayType(IntegerType))
+ checkDataType(NullType, StructType(Nil), StructType(Nil))
+ checkDataType(NullType, NullType, NullType)
+
+ // BooleanType
+ checkDataType(BooleanType, BooleanType, BooleanType)
+ checkDataType(BooleanType, IntegerType, StringType)
+ checkDataType(BooleanType, LongType, StringType)
+ checkDataType(BooleanType, DoubleType, StringType)
+ checkDataType(BooleanType, DecimalType, StringType)
+ checkDataType(BooleanType, StringType, StringType)
+ checkDataType(BooleanType, ArrayType(IntegerType), StringType)
+ checkDataType(BooleanType, StructType(Nil), StringType)
+
+ // IntegerType
+ checkDataType(IntegerType, IntegerType, IntegerType)
+ checkDataType(IntegerType, LongType, LongType)
+ checkDataType(IntegerType, DoubleType, DoubleType)
+ checkDataType(IntegerType, DecimalType, DecimalType)
+ checkDataType(IntegerType, StringType, StringType)
+ checkDataType(IntegerType, ArrayType(IntegerType), StringType)
+ checkDataType(IntegerType, StructType(Nil), StringType)
+
+ // LongType
+ checkDataType(LongType, LongType, LongType)
+ checkDataType(LongType, DoubleType, DoubleType)
+ checkDataType(LongType, DecimalType, DecimalType)
+ checkDataType(LongType, StringType, StringType)
+ checkDataType(LongType, ArrayType(IntegerType), StringType)
+ checkDataType(LongType, StructType(Nil), StringType)
+
+ // DoubleType
+ checkDataType(DoubleType, DoubleType, DoubleType)
+ checkDataType(DoubleType, DecimalType, DecimalType)
+ checkDataType(DoubleType, StringType, StringType)
+ checkDataType(DoubleType, ArrayType(IntegerType), StringType)
+ checkDataType(DoubleType, StructType(Nil), StringType)
+
+ // DoubleType
+ checkDataType(DecimalType, DecimalType, DecimalType)
+ checkDataType(DecimalType, StringType, StringType)
+ checkDataType(DecimalType, ArrayType(IntegerType), StringType)
+ checkDataType(DecimalType, StructType(Nil), StringType)
+
+ // StringType
+ checkDataType(StringType, StringType, StringType)
+ checkDataType(StringType, ArrayType(IntegerType), StringType)
+ checkDataType(StringType, StructType(Nil), StringType)
+
+ // ArrayType
+ checkDataType(ArrayType(IntegerType), ArrayType(IntegerType), ArrayType(IntegerType))
+ checkDataType(ArrayType(IntegerType), ArrayType(LongType), ArrayType(LongType))
+ checkDataType(ArrayType(IntegerType), ArrayType(StringType), ArrayType(StringType))
+ checkDataType(ArrayType(IntegerType), StructType(Nil), StringType)
+
+ // StructType
+ checkDataType(StructType(Nil), StructType(Nil), StructType(Nil))
+ checkDataType(
+ StructType(StructField("f1", IntegerType, true) :: Nil),
+ StructType(StructField("f1", IntegerType, true) :: Nil),
+ StructType(StructField("f1", IntegerType, true) :: Nil))
+ checkDataType(
+ StructType(StructField("f1", IntegerType, true) :: Nil),
+ StructType(Nil),
+ StructType(StructField("f1", IntegerType, true) :: Nil))
+ checkDataType(
+ StructType(
+ StructField("f1", IntegerType, true) ::
+ StructField("f2", IntegerType, true) :: Nil),
+ StructType(StructField("f1", LongType, true) :: Nil) ,
+ StructType(
+ StructField("f1", LongType, true) ::
+ StructField("f2", IntegerType, true) :: Nil))
+ checkDataType(
+ StructType(
+ StructField("f1", IntegerType, true) :: Nil),
+ StructType(
+ StructField("f2", IntegerType, true) :: Nil),
+ StructType(
+ StructField("f1", IntegerType, true) ::
+ StructField("f2", IntegerType, true) :: Nil))
+ checkDataType(
+ StructType(
+ StructField("f1", IntegerType, true) :: Nil),
+ DecimalType,
+ StringType)
+ }
+
+ test("Primitive field and type inferring") {
+ val jsonSchemaRDD = jsonRDD(primitiveFieldAndType)
+
+ val expectedSchema =
+ AttributeReference("bigInteger", DecimalType, true)() ::
+ AttributeReference("boolean", BooleanType, true)() ::
+ AttributeReference("double", DoubleType, true)() ::
+ AttributeReference("integer", IntegerType, true)() ::
+ AttributeReference("long", LongType, true)() ::
+ AttributeReference("null", StringType, true)() ::
+ AttributeReference("string", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ (BigDecimal("92233720368547758070"),
+ true,
+ 1.7976931348623157E308,
+ 10,
+ 21474836470L,
+ null,
+ "this is a simple string.") :: Nil
+ )
+ }
+
+ test("Complex field and type inferring") {
+ val jsonSchemaRDD = jsonRDD(complexFieldAndType)
+
+ val expectedSchema =
+ AttributeReference("arrayOfArray1", ArrayType(ArrayType(StringType)), true)() ::
+ AttributeReference("arrayOfArray2", ArrayType(ArrayType(DoubleType)), true)() ::
+ AttributeReference("arrayOfBigInteger", ArrayType(DecimalType), true)() ::
+ AttributeReference("arrayOfBoolean", ArrayType(BooleanType), true)() ::
+ AttributeReference("arrayOfDouble", ArrayType(DoubleType), true)() ::
+ AttributeReference("arrayOfInteger", ArrayType(IntegerType), true)() ::
+ AttributeReference("arrayOfLong", ArrayType(LongType), true)() ::
+ AttributeReference("arrayOfNull", ArrayType(StringType), true)() ::
+ AttributeReference("arrayOfString", ArrayType(StringType), true)() ::
+ AttributeReference("arrayOfStruct", ArrayType(
+ StructType(StructField("field1", BooleanType, true) ::
+ StructField("field2", StringType, true) :: Nil)), true)() ::
+ AttributeReference("struct", StructType(
+ StructField("field1", BooleanType, true) ::
+ StructField("field2", DecimalType, true) :: Nil), true)() ::
+ AttributeReference("structWithArrayFields", StructType(
+ StructField("field1", ArrayType(IntegerType), true) ::
+ StructField("field2", ArrayType(StringType), true) :: Nil), true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ // Access elements of a primitive array.
+ checkAnswer(
+ sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"),
+ ("str1", "str2", null) :: Nil
+ )
+
+ // Access an array of null values.
+ checkAnswer(
+ sql("select arrayOfNull from jsonTable"),
+ Seq(Seq(null, null, null, null)) :: Nil
+ )
+
+ // Access elements of a BigInteger array (we use DecimalType internally).
+ checkAnswer(
+ sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"),
+ (BigDecimal("922337203685477580700"), BigDecimal("-922337203685477580800"), null) :: Nil
+ )
+
+ // Access elements of an array of arrays.
+ checkAnswer(
+ sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"),
+ (Seq("1", "2", "3"), Seq("str1", "str2")) :: Nil
+ )
+
+ // Access elements of an array of arrays.
+ checkAnswer(
+ sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"),
+ (Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) :: Nil
+ )
+
+ // Access elements of an array inside a filed with the type of ArrayType(ArrayType).
+ checkAnswer(
+ sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"),
+ ("str2", 2.1) :: Nil
+ )
+
+ // Access elements of an array of structs.
+ checkAnswer(
+ sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2] from jsonTable"),
+ (true :: "str1" :: Nil, false :: null :: Nil, null) :: Nil
+ )
+
+ // Access a struct and fields inside of it.
+ checkAnswer(
+ sql("select struct, struct.field1, struct.field2 from jsonTable"),
+ (
+ Seq(true, BigDecimal("92233720368547758070")),
+ true,
+ BigDecimal("92233720368547758070")) :: Nil
+ )
+
+ // Access an array field of a struct.
+ checkAnswer(
+ sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"),
+ (Seq(4, 5, 6), Seq("str1", "str2")) :: Nil
+ )
+
+ // Access elements of an array field of a struct.
+ checkAnswer(
+ sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"),
+ (5, null) :: Nil
+ )
+ }
+
+ ignore("Complex field and type inferring (Ignored)") {
+ val jsonSchemaRDD = jsonRDD(complexFieldAndType)
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ // Right now, "field1" and "field2" are treated as aliases. We should fix it.
+ checkAnswer(
+ sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"),
+ (true, "str1") :: Nil
+ )
+
+ // Right now, the analyzer cannot resolve arrayOfStruct.field1 and arrayOfStruct.field2.
+ // Getting all values of a specific field from an array of structs.
+ checkAnswer(
+ sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"),
+ (Seq(true, false), Seq("str1", null)) :: Nil
+ )
+ }
+
+ test("Type conflict in primitive field values") {
+ val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict)
+
+ val expectedSchema =
+ AttributeReference("num_bool", StringType, true)() ::
+ AttributeReference("num_num_1", LongType, true)() ::
+ AttributeReference("num_num_2", DecimalType, true)() ::
+ AttributeReference("num_num_3", DoubleType, true)() ::
+ AttributeReference("num_str", StringType, true)() ::
+ AttributeReference("str_bool", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ ("true", 11L, null, 1.1, "13.1", "str1") ::
+ ("12", null, BigDecimal("21474836470.9"), null, null, "true") ::
+ ("false", 21474836470L, BigDecimal("92233720368547758070"), 100, "str1", "false") ::
+ (null, 21474836570L, BigDecimal(1.1), 21474836470L, "92233720368547758070", null) :: Nil
+ )
+
+ // Number and Boolean conflict: resolve the type as number in this query.
+ checkAnswer(
+ sql("select num_bool - 10 from jsonTable where num_bool > 11"),
+ 2
+ )
+
+ // Widening to LongType
+ checkAnswer(
+ sql("select num_num_1 - 100 from jsonTable where num_num_1 > 11"),
+ Seq(21474836370L) :: Seq(21474836470L) :: Nil
+ )
+
+ checkAnswer(
+ sql("select num_num_1 - 100 from jsonTable where num_num_1 > 10"),
+ Seq(-89) :: Seq(21474836370L) :: Seq(21474836470L) :: Nil
+ )
+
+ // Widening to DecimalType
+ checkAnswer(
+ sql("select num_num_2 + 1.2 from jsonTable where num_num_2 > 1.1"),
+ Seq(BigDecimal("21474836472.1")) :: Seq(BigDecimal("92233720368547758071.2")) :: Nil
+ )
+
+ // Widening to DoubleType
+ checkAnswer(
+ sql("select num_num_3 + 1.2 from jsonTable where num_num_3 > 1.1"),
+ Seq(101.2) :: Seq(21474836471.2) :: Nil
+ )
+
+ // Number and String conflict: resolve the type as number in this query.
+ checkAnswer(
+ sql("select num_str + 1.2 from jsonTable where num_str > 14"),
+ 92233720368547758071.2
+ )
+
+ // String and Boolean conflict: resolve the type as string.
+ checkAnswer(
+ sql("select * from jsonTable where str_bool = 'str1'"),
+ ("true", 11L, null, 1.1, "13.1", "str1") :: Nil
+ )
+ }
+
+ ignore("Type conflict in primitive field values (Ignored)") {
+ val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict)
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ // Right now, the analyzer does not promote strings in a boolean expreesion.
+ // Number and Boolean conflict: resolve the type as boolean in this query.
+ checkAnswer(
+ sql("select num_bool from jsonTable where NOT num_bool"),
+ false
+ )
+
+ checkAnswer(
+ sql("select str_bool from jsonTable where NOT str_bool"),
+ false
+ )
+
+ // Right now, the analyzer does not know that num_bool should be treated as a boolean.
+ // Number and Boolean conflict: resolve the type as boolean in this query.
+ checkAnswer(
+ sql("select num_bool from jsonTable where num_bool"),
+ true
+ )
+
+ checkAnswer(
+ sql("select str_bool from jsonTable where str_bool"),
+ false
+ )
+
+ // Right now, we have a parsing error.
+ // Number and String conflict: resolve the type as number in this query.
+ checkAnswer(
+ sql("select num_str + 1.2 from jsonTable where num_str > 92233720368547758060"),
+ BigDecimal("92233720368547758061.2")
+ )
+
+ // The plan of the following DSL is
+ // Project [(CAST(num_str#65:4, DoubleType) + 1.2) AS num#78]
+ // Filter (CAST(CAST(num_str#65:4, DoubleType), DecimalType) > 92233720368547758060)
+ // ExistingRdd [num_bool#61,num_num_1#62L,num_num_2#63,num_num_3#64,num_str#65,str_bool#66]
+ // We should directly cast num_str to DecimalType and also need to do the right type promotion
+ // in the Project.
+ checkAnswer(
+ jsonSchemaRDD.
+ where('num_str > BigDecimal("92233720368547758060")).
+ select('num_str + 1.2 as Symbol("num")),
+ BigDecimal("92233720368547758061.2")
+ )
+
+ // The following test will fail. The type of num_str is StringType.
+ // So, to evaluate num_str + 1.2, we first need to use Cast to convert the type.
+ // In our test data, one value of num_str is 13.1.
+ // The result of (CAST(num_str#65:4, DoubleType) + 1.2) for this value is 14.299999999999999,
+ // which is not 14.3.
+ // Number and String conflict: resolve the type as number in this query.
+ checkAnswer(
+ sql("select num_str + 1.2 from jsonTable where num_str > 13"),
+ Seq(14.3) :: Seq(92233720368547758071.2) :: Nil
+ )
+ }
+
+ test("Type conflict in complex field values") {
+ val jsonSchemaRDD = jsonRDD(complexFieldValueTypeConflict)
+
+ val expectedSchema =
+ AttributeReference("array", ArrayType(IntegerType), true)() ::
+ AttributeReference("num_struct", StringType, true)() ::
+ AttributeReference("str_array", StringType, true)() ::
+ AttributeReference("struct", StructType(
+ StructField("field", StringType, true) :: Nil), true)() ::
+ AttributeReference("struct_array", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ (Seq(), "11", "[1,2,3]", Seq(null), "[]") ::
+ (null, """{"field":false}""", null, null, "{}") ::
+ (Seq(4, 5, 6), null, "str", Seq(null), "[7,8,9]") ::
+ (Seq(7), "{}","[str1,str2,33]", Seq("str"), """{"field":true}""") :: Nil
+ )
+ }
+
+ test("Type conflict in array elements") {
+ val jsonSchemaRDD = jsonRDD(arrayElementTypeConflict)
+
+ val expectedSchema =
+ AttributeReference("array", ArrayType(StringType), true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ Seq(Seq("1", "1.1", "true", null, "[]", "{}", "[2,3,4]",
+ """{"field":str}""")) :: Nil
+ )
+
+ // Treat an element as a number.
+ checkAnswer(
+ sql("select array[0] + 1 from jsonTable"),
+ 2
+ )
+ }
+
+ test("Handling missing fields") {
+ val jsonSchemaRDD = jsonRDD(missingFields)
+
+ val expectedSchema =
+ AttributeReference("a", BooleanType, true)() ::
+ AttributeReference("b", LongType, true)() ::
+ AttributeReference("c", ArrayType(IntegerType), true)() ::
+ AttributeReference("d", StructType(
+ StructField("field", BooleanType, true) :: Nil), true)() ::
+ AttributeReference("e", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+ }
+
+ test("Loading a JSON dataset from a text file") {
+ val file = getTempFilePath("json")
+ val path = file.toString
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+ val jsonSchemaRDD = jsonFile(path)
+
+ val expectedSchema =
+ AttributeReference("bigInteger", DecimalType, true)() ::
+ AttributeReference("boolean", BooleanType, true)() ::
+ AttributeReference("double", DoubleType, true)() ::
+ AttributeReference("integer", IntegerType, true)() ::
+ AttributeReference("long", LongType, true)() ::
+ AttributeReference("null", StringType, true)() ::
+ AttributeReference("string", StringType, true)() :: Nil
+
+ comparePlans(Schema(expectedSchema), Schema(jsonSchemaRDD.logicalPlan.output))
+
+ jsonSchemaRDD.registerAsTable("jsonTable")
+
+ checkAnswer(
+ sql("select * from jsonTable"),
+ (BigDecimal("92233720368547758070"),
+ true,
+ 1.7976931348623157E308,
+ 10,
+ 21474836470L,
+ null,
+ "this is a simple string.") :: Nil
+ )
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
new file mode 100644
index 0000000000..065e04046e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.json
+
+import org.apache.spark.sql.test.TestSQLContext
+
+object TestJsonData {
+
+ val primitiveFieldAndType =
+ TestSQLContext.sparkContext.parallelize(
+ """{"string":"this is a simple string.",
+ "integer":10,
+ "long":21474836470,
+ "bigInteger":92233720368547758070,
+ "double":1.7976931348623157E308,
+ "boolean":true,
+ "null":null
+ }""" :: Nil)
+
+ val complexFieldAndType =
+ TestSQLContext.sparkContext.parallelize(
+ """{"struct":{"field1": true, "field2": 92233720368547758070},
+ "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
+ "arrayOfString":["str1", "str2"],
+ "arrayOfInteger":[1, 2147483647, -2147483648],
+ "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808],
+ "arrayOfBigInteger":[922337203685477580700, -922337203685477580800],
+ "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308],
+ "arrayOfBoolean":[true, false, true],
+ "arrayOfNull":[null, null, null, null],
+ "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}],
+ "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
+ "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
+ }""" :: Nil)
+
+ val primitiveFieldValueTypeConflict =
+ TestSQLContext.sparkContext.parallelize(
+ """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
+ "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" ::
+ """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null,
+ "num_bool":12, "num_str":null, "str_bool":true}""" ::
+ """{"num_num_1":21474836470, "num_num_2":92233720368547758070, "num_num_3": 100,
+ "num_bool":false, "num_str":"str1", "str_bool":false}""" ::
+ """{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470,
+ "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil)
+
+ val complexFieldValueTypeConflict =
+ TestSQLContext.sparkContext.parallelize(
+ """{"num_struct":11, "str_array":[1, 2, 3],
+ "array":[], "struct_array":[], "struct": {}}""" ::
+ """{"num_struct":{"field":false}, "str_array":null,
+ "array":null, "struct_array":{}, "struct": null}""" ::
+ """{"num_struct":null, "str_array":"str",
+ "array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": {"field":null}}""" ::
+ """{"num_struct":{}, "str_array":["str1", "str2", 33],
+ "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil)
+
+ val arrayElementTypeConflict =
+ TestSQLContext.sparkContext.parallelize(
+ """{"array": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}]}""" :: Nil)
+
+ val missingFields =
+ TestSQLContext.sparkContext.parallelize(
+ """{"a":true}""" ::
+ """{"b":21474836470}""" ::
+ """{"c":[33, 44]}""" ::
+ """{"d":{"field":true}}""" ::
+ """{"e":"str"}""" :: Nil)
+}