diff options
Diffstat (limited to 'docs')
-rw-r--r-- | docs/sql-programming-guide.md | 290 |
1 files changed, 222 insertions, 68 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4623bb4247..522c83884e 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -17,20 +17,20 @@ Spark. At the core of this component is a new type of RDD, [Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`. </div> <div data-lang="java" markdown="1"> -Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using +Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using Spark. At the core of this component is a new type of RDD, [JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed [Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along with a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). </div> <div data-lang="python" markdown="1"> @@ -41,7 +41,7 @@ Spark. At the core of this component is a new type of RDD, [Row](api/python/pyspark.sql.Row-class.html) objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) -file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). +file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell. </div> @@ -64,8 +64,8 @@ descendants. To create a basic SQLContext, all you need is a SparkContext. val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -// Importing the SQL context gives access to all the public SQL functions and implicit conversions. -import sqlContext._ +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD {% endhighlight %} </div> @@ -77,8 +77,8 @@ The entry point into all relational functionality in Spark is the of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext. {% highlight java %} -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx); +JavaSparkContext sc = ...; // An existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); {% endhighlight %} </div> @@ -91,14 +91,33 @@ of its decedents. To create a basic SQLContext, all you need is a SparkContext. {% highlight python %} from pyspark.sql import SQLContext -sqlCtx = SQLContext(sc) +sqlContext = SQLContext(sc) {% endhighlight %} </div> </div> -## Running SQL on RDDs +# Data Sources + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +</div> + +<div data-lang="java" markdown="1"> +Spark SQL supports operating on a variety of data sources through the `JavaSchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +</div> + +<div data-lang="python" markdown="1"> +Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface. +Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. +</div> +</div> + +## RDDs <div class="codetabs"> @@ -111,8 +130,10 @@ types such as Sequences or Arrays. This RDD can be implicitly converted to a Sch registered as a table. Tables can be used in subsequent SQL statements. {% highlight scala %} +// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, @@ -124,7 +145,7 @@ val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(" people.registerAsTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. -val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -170,12 +191,11 @@ A schema can be applied to an existing RDD by calling `applySchema` and providin for the JavaBean. {% highlight java %} - -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx) +// sc is an existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc) // Load a text file and convert each line to a JavaBean. -JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map( +JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); @@ -189,11 +209,11 @@ JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt"). }); // Apply a schema to an RDD of JavaBeans and register it as a table. -JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class); +JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class); schemaPeople.registerAsTable("people"); // SQL can be run over RDDs that have been registered as tables. -JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -215,6 +235,10 @@ row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as can be used in subsequent SQL statements. {% highlight python %} +# sc is an existing SparkContext. +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + # Load a text file and convert each line to a dictionary. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) @@ -223,14 +247,16 @@ people = parts.map(lambda p: {"name": p[0], "age": int(p[1])}) # Infer the schema, and register the SchemaRDD as a table. # In future versions of PySpark we would like to add support for registering RDDs with other # datatypes as tables -peopleTable = sqlCtx.inferSchema(people) -peopleTable.registerAsTable("people") +schemaPeople = sqlContext.inferSchema(people) +schemaPeople.registerAsTable("people") # SQL can be run over SchemaRDDs that have been registered as a table. -teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) +for teenName in teenNames.collect(): + print teenName {% endhighlight %} </div> @@ -241,7 +267,7 @@ teenNames = teenagers.map(lambda p: "Name: " + p.name) Users that want a more complete dialect of SQL should look at the HiveQL support provided by `HiveContext`. -## Using Parquet +## Parquet Files [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema @@ -252,22 +278,23 @@ of the original data. Using the data from the above example: <div data-lang="scala" markdown="1"> {% highlight scala %} -val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ +// sqlContext from the previous example is used in this example. +// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. +import sqlContext.createSchemaRDD val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. -// The RDD is implicitly converted to a SchemaRDD, allowing it to be stored using Parquet. +// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet. people.saveAsParquetFile("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. -// The result of loading a Parquet file is also a JavaSchemaRDD. +// The result of loading a Parquet file is also a SchemaRDD. val parquetFile = sqlContext.parquetFile("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile") -val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") -teenagers.collect().foreach(println) +val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) {% endhighlight %} </div> @@ -275,6 +302,7 @@ teenagers.collect().foreach(println) <div data-lang="java" markdown="1"> {% highlight java %} +// sqlContext from the previous example is used in this example. JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example. @@ -283,13 +311,16 @@ schemaPeople.saveAsParquetFile("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a JavaSchemaRDD. -JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet"); +JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); -JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); - - +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); +List<String> teenagerNames = teenagers.map(new Function<Row, String>() { + public String call(Row row) { + return "Name: " + row.getString(0); + } +}).collect(); {% endhighlight %} </div> @@ -297,50 +328,149 @@ JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= <div data-lang="python" markdown="1"> {% highlight python %} +# sqlContext from the previous example is used in this example. -peopleTable # The SchemaRDD from the previous example. +schemaPeople # The SchemaRDD from the previous example. # SchemaRDDs can be saved as Parquet files, maintaining the schema information. -peopleTable.saveAsParquetFile("people.parquet") +schemaPeople.saveAsParquetFile("people.parquet") # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a SchemaRDD. -parquetFile = sqlCtx.parquetFile("people.parquet") +parquetFile = sqlContext.parquetFile("people.parquet") # Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); -teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") - +teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenNames = teenagers.map(lambda p: "Name: " + p.name) +for teenName in teenNames.collect(): + print teenName {% endhighlight %} </div> </div> -## Writing Language-Integrated Relational Queries +## JSON Datasets +<div class="codetabs"> -**Language-Integrated queries are currently only supported in Scala.** +<div data-lang="scala" markdown="1"> +Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD. +This conversion can be done using one of two methods in a SQLContext: -Spark SQL also supports a domain specific language for writing queries. Once again, -using the data from the above examples: +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. {% highlight scala %} +// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext._ -val people: RDD[Person] = ... // An RDD of case class objects, from the first example. -// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' -val teenagers = people.where('age >= 10).where('age <= 19).select('name) +// A JSON dataset is pointed to by path. +// The path can be either a single text file or a directory storing text files. +val path = "examples/src/main/resources/people.json" +// Create a SchemaRDD from the file(s) pointed to by path +val people = sqlContext.jsonFile(path) + +// The inferred schema can be visualized using the printSchema() method. +people.printSchema() +// root +// |-- age: IntegerType +// |-- name: StringType + +// Register this SchemaRDD as a table. +people.registerAsTable("people") + +// SQL statements can be run by using the sql methods provided by sqlContext. +val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +// Alternatively, a SchemaRDD can be created for a JSON dataset represented by +// an RDD[String] storing one JSON object per string. +val anotherPeopleRDD = sc.parallelize( + """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) +val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) {% endhighlight %} -The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers -prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are -evaluated by the SQL execution engine. A full list of the functions supported can be found in the -[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). +</div> -<!-- TODO: Include the table of operations here. --> +<div data-lang="java" markdown="1"> +Spark SQL can automatically infer the schema of a JSON dataset and load it as a JavaSchemaRDD. +This conversion can be done using one of two methods in a JavaSQLContext : -# Hive Support +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. + +{% highlight java %} +// sc is an existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); + +// A JSON dataset is pointed to by path. +// The path can be either a single text file or a directory storing text files. +String path = "examples/src/main/resources/people.json"; +// Create a JavaSchemaRDD from the file(s) pointed to by path +JavaSchemaRDD people = sqlContext.jsonFile(path); + +// The inferred schema can be visualized using the printSchema() method. +people.printSchema(); +// root +// |-- age: IntegerType +// |-- name: StringType + +// Register this JavaSchemaRDD as a table. +people.registerAsTable("people"); + +// SQL statements can be run by using the sql methods provided by sqlContext. +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); + +// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by +// an RDD[String] storing one JSON object per string. +List<String> jsonData = Arrays.asList( + "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); +JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData); +JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD); +{% endhighlight %} +</div> + +<div data-lang="python" markdown="1"> +Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD. +This conversion can be done using one of two methods in a SQLContext: + +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. +* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object. + +{% highlight python %} +# sc is an existing SparkContext. +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + +# A JSON dataset is pointed to by path. +# The path can be either a single text file or a directory storing text files. +path = "examples/src/main/resources/people.json" +# Create a SchemaRDD from the file(s) pointed to by path +people = sqlContext.jsonFile(path) + +# The inferred schema can be visualized using the printSchema() method. +people.printSchema() +# root +# |-- age: IntegerType +# |-- name: StringType + +# Register this SchemaRDD as a table. +people.registerAsTable("people") + +# SQL statements can be run by using the sql methods provided by sqlContext. +teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +# Alternatively, a SchemaRDD can be created for a JSON dataset represented by +# an RDD[String] storing one JSON object per string. +anotherPeopleRDD = sc.parallelize([ + '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']) +anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) +{% endhighlight %} +</div> + +</div> + +## Hive Tables Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. @@ -362,17 +492,14 @@ which is similar to `HiveContext`, but creates a local copy of the `metastore` a automatically. {% highlight scala %} -val sc: SparkContext // An existing SparkContext. +// sc is an existing SparkContext. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) -// Importing the SQL context gives access to all the public SQL functions and implicit conversions. -import hiveContext._ - -hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL -hql("FROM src SELECT key, value").collect().foreach(println) +hiveContext.hql("FROM src SELECT key, value").collect().foreach(println) {% endhighlight %} </div> @@ -385,14 +512,14 @@ the `sql` method a `JavaHiveContext` also provides an `hql` methods, which allow expressed in HiveQL. {% highlight java %} -JavaSparkContext ctx = ...; // An existing JavaSparkContext. -JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.HiveContext(ctx); +// sc is an existing JavaSparkContext. +JavaHiveContext hiveContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc); -hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); -hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. -Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect(); +Row[] results = hiveContext.hql("FROM src SELECT key, value").collect(); {% endhighlight %} @@ -406,17 +533,44 @@ the `sql` method a `HiveContext` also provides an `hql` methods, which allows qu expressed in HiveQL. {% highlight python %} - +# sc is an existing SparkContext. from pyspark.sql import HiveContext -hiveCtx = HiveContext(sc) +hiveContext = HiveContext(sc) -hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. -results = hiveCtx.hql("FROM src SELECT key, value").collect() +results = hiveContext.hql("FROM src SELECT key, value").collect() {% endhighlight %} </div> </div> + + +# Writing Language-Integrated Relational Queries + +**Language-Integrated queries are currently only supported in Scala.** + +Spark SQL also supports a domain specific language for writing queries. Once again, +using the data from the above examples: + +{% highlight scala %} +// sc is an existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) +// Importing the SQL context gives access to all the public SQL functions and implicit conversions. +import sqlContext._ +val people: RDD[Person] = ... // An RDD of case class objects, from the first example. + +// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' +val teenagers = people.where('age >= 10).where('age <= 19).select('name) +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) +{% endhighlight %} + +The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers +prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are +evaluated by the SQL execution engine. A full list of the functions supported can be found in the +[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). + +<!-- TODO: Include the table of operations here. -->
\ No newline at end of file |