From d1e22b386839e6f81cfd83b1903b9dc8c4bbef64 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Jun 2014 19:14:59 -0700 Subject: [SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL JIRA: https://issues.apache.org/jira/browse/SPARK-2060 Programming guide: http://yhuai.github.io/site/sql-programming-guide.html Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext Author: Yin Huai Closes #999 from yhuai/newJson and squashes the following commits: 227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ce8eedd [Yin Huai] rxin's comments. bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 94ffdaa [Yin Huai] Remove "get" from method names. ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 79ea9ba [Yin Huai] Fix typos. 5428451 [Yin Huai] Newline 1f908ce [Yin Huai] Remove extra line. d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 7ea750e [Yin Huai] marmbrus's comments. 6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 83013fb [Yin Huai] Update Java Example. e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map. 6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4fbddf0 [Yin Huai] Programming guide. 9df8c5a [Yin Huai] Python API. 7027634 [Yin Huai] Java API. cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset. d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ab810b0 [Yin Huai] Make JsonRDD private. 6df0891 [Yin Huai] Apache header. 8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema. 8ffed79 [Yin Huai] Update the example. a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution. 65b87f0 [Yin Huai] Fix sampling... 8846af5 [Yin Huai] API doc. 52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 0387523 [Yin Huai] Address PR comments. 666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson a2313a6 [Yin Huai] Address PR comments. f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used. 0576406 [Yin Huai] Add Apache license header. af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD. f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema. (cherry picked from commit d2f4f30b12f99358953e2781957468e2cfe3c916) Signed-off-by: Reynold Xin --- docs/sql-programming-guide.md | 290 ++++++++++++++++++++++++++++++++---------- 1 file changed, 222 insertions(+), 68 deletions(-) (limited to 'docs') diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4623bb4247..522c83884e 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -17,20 +17,20 @@ Spark. At the core of this component is a new type of RDD, [Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.
-Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using +Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using Spark. At the core of this component is a new type of RDD, [JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed [Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along with a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
@@ -41,7 +41,7 @@ Spark. At the core of this component is a new type of RDD, [Row](api/python/pyspark.sql.Row-class.html) objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.
@@ -64,8 +64,8 @@ descendants. To create a basic SQLContext, all you need is a SparkContext. val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -// Importing the SQL context gives access to all the public SQL functions and implicit conversions. -import sqlContext._ +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD {% endhighlight %} @@ -77,8 +77,8 @@ The entry point into all relational functionality in Spark is the of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext. {% highlight java %} -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx); +JavaSparkContext sc = ...; // An existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); {% endhighlight %} @@ -91,14 +91,33 @@ of its decedents. To create a basic SQLContext, all you need is a SparkContext. {% highlight python %} from pyspark.sql import SQLContext -sqlCtx = SQLContext(sc) +sqlContext = SQLContext(sc) {% endhighlight %} -## Running SQL on RDDs +# Data Sources + +
+
+Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +
+ +
+Spark SQL supports operating on a variety of data sources through the `JavaSchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +
+ +
+Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +
+
+ +## RDDs
@@ -111,8 +130,10 @@ types such as Sequences or Arrays. This RDD can be implicitly converted to a Sch registered as a table. Tables can be used in subsequent SQL statements. {% highlight scala %} +// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, @@ -124,7 +145,7 @@ val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(" people.registerAsTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. -val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -170,12 +191,11 @@ A schema can be applied to an existing RDD by calling `applySchema` and providin for the JavaBean. {% highlight java %} - -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx) +// sc is an existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc) // Load a text file and convert each line to a JavaBean. -JavaRDD people = ctx.textFile("examples/src/main/resources/people.txt").map( +JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").map( new Function() { public Person call(String line) throws Exception { String[] parts = line.split(","); @@ -189,11 +209,11 @@ JavaRDD people = ctx.textFile("examples/src/main/resources/people.txt"). }); // Apply a schema to an RDD of JavaBeans and register it as a table. -JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class); +JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class); schemaPeople.registerAsTable("people"); // SQL can be run over RDDs that have been registered as tables. -JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -215,6 +235,10 @@ row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as can be used in subsequent SQL statements. {% highlight python %} +# sc is an existing SparkContext. +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + # Load a text file and convert each line to a dictionary. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) @@ -223,14 +247,16 @@ people = parts.map(lambda p: {"name": p[0], "age": int(p[1])}) # Infer the schema, and register the SchemaRDD as a table. # In future versions of PySpark we would like to add support for registering RDDs with other # datatypes as tables -peopleTable = sqlCtx.inferSchema(people) -peopleTable.registerAsTable("people") +schemaPeople = sqlContext.inferSchema(people) +schemaPeople.registerAsTable("people") # SQL can be run over SchemaRDDs that have been registered as a table. -teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) +for teenName in teenNames.collect(): + print teenName {% endhighlight %}
@@ -241,7 +267,7 @@ teenNames = teenagers.map(lambda p: "Name: " + p.name) Users that want a more complete dialect of SQL should look at the HiveQL support provided by `HiveContext`. -## Using Parquet +## Parquet Files [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema @@ -252,22 +278,23 @@ of the original data. Using the data from the above example:
{% highlight scala %} -val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ +// sqlContext from the previous example is used in this example. +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. -// The RDD is implicitly converted to a SchemaRDD, allowing it to be stored using Parquet. +// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet. people.saveAsParquetFile("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. -// The result of loading a Parquet file is also a JavaSchemaRDD. +// The result of loading a Parquet file is also a SchemaRDD. val parquetFile = sqlContext.parquetFile("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile") -val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") -teenagers.collect().foreach(println) +val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) {% endhighlight %}
@@ -275,6 +302,7 @@ teenagers.collect().foreach(println)
{% highlight java %} +// sqlContext from the previous example is used in this example. JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example. @@ -283,13 +311,16 @@ schemaPeople.saveAsParquetFile("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a JavaSchemaRDD. -JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet"); +JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); -JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); - - +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); +List teenagerNames = teenagers.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0); + } +}).collect(); {% endhighlight %}
@@ -297,50 +328,149 @@ JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >=
{% highlight python %} +# sqlContext from the previous example is used in this example. -peopleTable # The SchemaRDD from the previous example. +schemaPeople # The SchemaRDD from the previous example. # SchemaRDDs can be saved as Parquet files, maintaining the schema information. -peopleTable.saveAsParquetFile("people.parquet") +schemaPeople.saveAsParquetFile("people.parquet") # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a SchemaRDD. -parquetFile = sqlCtx.parquetFile("people.parquet") +parquetFile = sqlContext.parquetFile("people.parquet") # Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); -teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") - +teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenNames = teenagers.map(lambda p: "Name: " + p.name) +for teenName in teenNames.collect(): + print teenName {% endhighlight %}
-## Writing Language-Integrated Relational Queries +## JSON Datasets +
-**Language-Integrated queries are currently only supported in Scala.** +
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD. +This conversion can be done using one of two methods in a SQLContext: -Spark SQL also supports a domain specific language for writing queries. Once again, -using the data from the above examples: +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. {% highlight scala %} +// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ -val people: RDD[Person] = ... // An RDD of case class objects, from the first example. -// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' -val teenagers = people.where('age >= 10).where('age <= 19).select('name) +// A JSON dataset is pointed to by path. +// The path can be either a single text file or a directory storing text files. +val path = "examples/src/main/resources/people.json" +// Create a SchemaRDD from the file(s) pointed to by path +val people = sqlContext.jsonFile(path) + +// The inferred schema can be visualized using the printSchema() method. +people.printSchema() +// root +// |-- age: IntegerType +// |-- name: StringType + +// Register this SchemaRDD as a table. +people.registerAsTable("people") + +// SQL statements can be run by using the sql methods provided by sqlContext. +val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +// Alternatively, a SchemaRDD can be created for a JSON dataset represented by +// an RDD[String] storing one JSON object per string. +val anotherPeopleRDD = sc.parallelize( + """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) +val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) {% endhighlight %} -The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers -prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are -evaluated by the SQL execution engine. A full list of the functions supported can be found in the -[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). +
- +
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a JavaSchemaRDD. +This conversion can be done using one of two methods in a JavaSQLContext : -# Hive Support +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. + +{% highlight java %} +// sc is an existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); + +// A JSON dataset is pointed to by path. +// The path can be either a single text file or a directory storing text files. +String path = "examples/src/main/resources/people.json"; +// Create a JavaSchemaRDD from the file(s) pointed to by path +JavaSchemaRDD people = sqlContext.jsonFile(path); + +// The inferred schema can be visualized using the printSchema() method. +people.printSchema(); +// root +// |-- age: IntegerType +// |-- name: StringType + +// Register this JavaSchemaRDD as a table. +people.registerAsTable("people"); + +// SQL statements can be run by using the sql methods provided by sqlContext. +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); + +// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by +// an RDD[String] storing one JSON object per string. +List jsonData = Arrays.asList( + "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); +JavaRDD anotherPeopleRDD = sc.parallelize(jsonData); +JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD); +{% endhighlight %} +
+ +
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD. +This conversion can be done using one of two methods in a SQLContext: + +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. + +{% highlight python %} +# sc is an existing SparkContext. +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + +# A JSON dataset is pointed to by path. +# The path can be either a single text file or a directory storing text files. +path = "examples/src/main/resources/people.json" +# Create a SchemaRDD from the file(s) pointed to by path +people = sqlContext.jsonFile(path) + +# The inferred schema can be visualized using the printSchema() method. +people.printSchema() +# root +# |-- age: IntegerType +# |-- name: StringType + +# Register this SchemaRDD as a table. +people.registerAsTable("people") + +# SQL statements can be run by using the sql methods provided by sqlContext. +teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +# Alternatively, a SchemaRDD can be created for a JSON dataset represented by +# an RDD[String] storing one JSON object per string. +anotherPeopleRDD = sc.parallelize([ + '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']) +anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) +{% endhighlight %} +
+ +
+ +## Hive Tables Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. @@ -362,17 +492,14 @@ which is similar to `HiveContext`, but creates a local copy of the `metastore` a automatically. {% highlight scala %} -val sc: SparkContext // An existing SparkContext. +// sc is an existing SparkContext. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) -// Importing the SQL context gives access to all the public SQL functions and implicit conversions. -import hiveContext._ - -hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL -hql("FROM src SELECT key, value").collect().foreach(println) +hiveContext.hql("FROM src SELECT key, value").collect().foreach(println) {% endhighlight %} @@ -385,14 +512,14 @@ the `sql` method a `JavaHiveContext` also provides an `hql` methods, which allow expressed in HiveQL. {% highlight java %} -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.HiveContext(ctx); +// sc is an existing JavaSparkContext. +JavaHiveContext hiveContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc); -hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); -hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. -Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect(); +Row[] results = hiveContext.hql("FROM src SELECT key, value").collect(); {% endhighlight %} @@ -406,17 +533,44 @@ the `sql` method a `HiveContext` also provides an `hql` methods, which allows qu expressed in HiveQL. {% highlight python %} - +# sc is an existing SparkContext. from pyspark.sql import HiveContext -hiveCtx = HiveContext(sc) +hiveContext = HiveContext(sc) -hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. -results = hiveCtx.hql("FROM src SELECT key, value").collect() +results = hiveContext.hql("FROM src SELECT key, value").collect() {% endhighlight %} + + +# Writing Language-Integrated Relational Queries + +**Language-Integrated queries are currently only supported in Scala.** + +Spark SQL also supports a domain specific language for writing queries. Once again, +using the data from the above examples: + +{% highlight scala %} +// sc is an existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) +// Importing the SQL context gives access to all the public SQL functions and implicit conversions. +import sqlContext._ +val people: RDD[Person] = ... // An RDD of case class objects, from the first example. + +// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' +val teenagers = people.where('age >= 10).where('age <= 19).select('name) +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) +{% endhighlight %} + +The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers +prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are +evaluated by the SQL execution engine. A full list of the functions supported can be found in the +[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). + + \ No newline at end of file -- cgit v1.2.3