From d1e22b386839e6f81cfd83b1903b9dc8c4bbef64 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Jun 2014 19:14:59 -0700 Subject: [SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL JIRA: https://issues.apache.org/jira/browse/SPARK-2060 Programming guide: http://yhuai.github.io/site/sql-programming-guide.html Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext Author: Yin Huai Closes #999 from yhuai/newJson and squashes the following commits: 227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ce8eedd [Yin Huai] rxin's comments. bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 94ffdaa [Yin Huai] Remove "get" from method names. ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 79ea9ba [Yin Huai] Fix typos. 5428451 [Yin Huai] Newline 1f908ce [Yin Huai] Remove extra line. d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 7ea750e [Yin Huai] marmbrus's comments. 6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 83013fb [Yin Huai] Update Java Example. e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map. 6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4fbddf0 [Yin Huai] Programming guide. 9df8c5a [Yin Huai] Python API. 7027634 [Yin Huai] Java API. cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset. d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ab810b0 [Yin Huai] Make JsonRDD private. 6df0891 [Yin Huai] Apache header. 8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema. 8ffed79 [Yin Huai] Update the example. a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution. 65b87f0 [Yin Huai] Fix sampling... 8846af5 [Yin Huai] API doc. 52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 0387523 [Yin Huai] Address PR comments. 666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson a2313a6 [Yin Huai] Address PR comments. f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used. 0576406 [Yin Huai] Add Apache license header. af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD. f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema. (cherry picked from commit d2f4f30b12f99358953e2781957468e2cfe3c916) Signed-off-by: Reynold Xin --- .../apache/spark/examples/sql/JavaSparkSQL.java | 78 +++++++++++++++++++++- examples/src/main/resources/people.json | 3 + 2 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 examples/src/main/resources/people.json (limited to 'examples') diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index ad5ec84b71..607df3eddd 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -18,6 +18,7 @@ package org.apache.spark.examples.sql; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; @@ -56,6 +57,7 @@ public class JavaSparkSQL { JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaSQLContext sqlCtx = new JavaSQLContext(ctx); + System.out.println("=== Data source: RDD ==="); // Load a text file and convert each line to a Java Bean. JavaRDD people = ctx.textFile("examples/src/main/resources/people.txt").map( new Function() { @@ -84,16 +86,88 @@ public class JavaSparkSQL { return "Name: " + row.getString(0); } }).collect(); + for (String name: teenagerNames) { + System.out.println(name); + } + System.out.println("=== Data source: Parquet File ==="); // JavaSchemaRDDs can be saved as parquet files, maintaining the schema information. schemaPeople.saveAsParquetFile("people.parquet"); - // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. + // Read in the parquet file created above. + // Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a JavaSchemaRDD. JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); - JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); + JavaSchemaRDD teenagers2 = + sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); + teenagerNames = teenagers2.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0); + } + }).collect(); + for (String name: teenagerNames) { + System.out.println(name); + } + + System.out.println("=== Data source: JSON Dataset ==="); + // A JSON dataset is pointed by path. + // The path can be either a single text file or a directory storing text files. + String path = "examples/src/main/resources/people.json"; + // Create a JavaSchemaRDD from the file(s) pointed by path + JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path); + + // Because the schema of a JSON dataset is automatically inferred, to write queries, + // it is better to take a look at what is the schema. + peopleFromJsonFile.printSchema(); + // The schema of people is ... + // root + // |-- age: IntegerType + // |-- name: StringType + + // Register this JavaSchemaRDD as a table. + peopleFromJsonFile.registerAsTable("people"); + + // SQL statements can be run by using the sql methods provided by sqlCtx. + JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); + + // The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations. + // The columns of a row in the result can be accessed by ordinal. + teenagerNames = teenagers3.map(new Function() { + public String call(Row row) { return "Name: " + row.getString(0); } + }).collect(); + for (String name: teenagerNames) { + System.out.println(name); + } + + // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by + // a RDD[String] storing one JSON object per string. + List jsonData = Arrays.asList( + "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); + JavaRDD anotherPeopleRDD = ctx.parallelize(jsonData); + JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD); + + // Take a look at the schema of this new JavaSchemaRDD. + peopleFromJsonRDD.printSchema(); + // The schema of anotherPeople is ... + // root + // |-- address: StructType + // | |-- city: StringType + // | |-- state: StringType + // |-- name: StringType + + peopleFromJsonRDD.registerAsTable("people2"); + + JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2"); + List nameAndCity = peopleWithCity.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0) + ", City: " + row.getString(1); + } + }).collect(); + for (String name: nameAndCity) { + System.out.println(name); + } } } diff --git a/examples/src/main/resources/people.json b/examples/src/main/resources/people.json new file mode 100644 index 0000000000..50a859cbd7 --- /dev/null +++ b/examples/src/main/resources/people.json @@ -0,0 +1,3 @@ +{"name":"Michael"} +{"name":"Andy", "age":30} +{"name":"Justin", "age":19} -- cgit v1.2.3