aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/sql-programming-guide.md572
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java186
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java336
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java217
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java131
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala254
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala148
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala83
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala107
9 files changed, 1228 insertions, 806 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index e838a13af7..2076b29a86 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -65,14 +65,14 @@ Throughout this document, we will often refer to Scala/Java Datasets of `Row`s a
The entry point into all functionality in Spark is the [`SparkSession`](api/scala/index.html#org.apache.spark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.builder()`:
-{% include_example init_session scala/org/apache/spark/examples/sql/RDDRelation.scala %}
+{% include_example init_session scala/org/apache/spark/examples/sql/SparkSqlExample.scala %}
</div>
<div data-lang="java" markdown="1">
The entry point into all functionality in Spark is the [`SparkSession`](api/java/index.html#org.apache.spark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.builder()`:
-{% include_example init_session java/org/apache/spark/examples/sql/JavaSparkSQL.java %}
+{% include_example init_session java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -105,14 +105,7 @@ from a Hive table, or from [Spark data sources](#data-sources).
As an example, the following creates a DataFrame based on the content of a JSON file:
-{% highlight scala %}
-val spark: SparkSession // An existing SparkSession.
-val df = spark.read.json("examples/src/main/resources/people.json")
-
-// Displays the content of the DataFrame to stdout
-df.show()
-{% endhighlight %}
-
+{% include_example create_df scala/org/apache/spark/examples/sql/SparkSqlExample.scala %}
</div>
<div data-lang="java" markdown="1">
@@ -121,14 +114,7 @@ from a Hive table, or from [Spark data sources](#data-sources).
As an example, the following creates a DataFrame based on the content of a JSON file:
-{% highlight java %}
-SparkSession spark = ...; // An existing SparkSession.
-Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
-
-// Displays the content of the DataFrame to stdout
-df.show();
-{% endhighlight %}
-
+{% include_example create_df java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -169,110 +155,20 @@ Here we include some basic examples of structured data processing using Datasets
<div class="codetabs">
<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val spark: SparkSession // An existing SparkSession
-
-// Create the DataFrame
-val df = spark.read.json("examples/src/main/resources/people.json")
-
-// Show the content of the DataFrame
-df.show()
-// age name
-// null Michael
-// 30 Andy
-// 19 Justin
-
-// Print the schema in a tree format
-df.printSchema()
-// root
-// |-- age: long (nullable = true)
-// |-- name: string (nullable = true)
-
-// Select only the "name" column
-df.select("name").show()
-// name
-// Michael
-// Andy
-// Justin
-
-// Select everybody, but increment the age by 1
-df.select(df("name"), df("age") + 1).show()
-// name (age + 1)
-// Michael null
-// Andy 31
-// Justin 20
-
-// Select people older than 21
-df.filter(df("age") > 21).show()
-// age name
-// 30 Andy
-
-// Count people by age
-df.groupBy("age").count().show()
-// age count
-// null 1
-// 19 1
-// 30 1
-{% endhighlight %}
+{% include_example untyped_ops scala/org/apache/spark/examples/sql/SparkSqlExample.scala %}
For a complete list of the types of operations that can be performed on a Dataset refer to the [API Documentation](api/scala/index.html#org.apache.spark.sql.Dataset).
In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the [DataFrame Function Reference](api/scala/index.html#org.apache.spark.sql.functions$).
-
-
</div>
<div data-lang="java" markdown="1">
-{% highlight java %}
-SparkSession spark = ...; // An existing SparkSession
-
-// Create the DataFrame
-Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
-
-// Show the content of the DataFrame
-df.show();
-// age name
-// null Michael
-// 30 Andy
-// 19 Justin
-
-// Print the schema in a tree format
-df.printSchema();
-// root
-// |-- age: long (nullable = true)
-// |-- name: string (nullable = true)
-
-// Select only the "name" column
-df.select("name").show();
-// name
-// Michael
-// Andy
-// Justin
-
-// Select everybody, but increment the age by 1
-df.select(df.col("name"), df.col("age").plus(1)).show();
-// name (age + 1)
-// Michael null
-// Andy 31
-// Justin 20
-
-// Select people older than 21
-df.filter(df.col("age").gt(21)).show();
-// age name
-// 30 Andy
-
-// Count people by age
-df.groupBy("age").count().show();
-// age count
-// null 1
-// 19 1
-// 30 1
-{% endhighlight %}
+
+{% include_example untyped_ops java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %}
For a complete list of the types of operations that can be performed on a Dataset refer to the [API Documentation](api/java/org/apache/spark/sql/Dataset.html).
In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the [DataFrame Function Reference](api/java/org/apache/spark/sql/functions.html).
-
</div>
<div data-lang="python" markdown="1">
@@ -353,19 +249,13 @@ In addition to simple column references and expressions, DataFrames also have a
<div data-lang="scala" markdown="1">
The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`.
-{% highlight scala %}
-val spark = ... // An existing SparkSession
-val df = spark.sql("SELECT * FROM table")
-{% endhighlight %}
+{% include_example run_sql scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
</div>
<div data-lang="java" markdown="1">
The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `Dataset<Row>`.
-{% highlight java %}
-SparkSession spark = ... // An existing SparkSession
-Dataset<Row> df = spark.sql("SELECT * FROM table")
-{% endhighlight %}
+{% include_example run_sql java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -397,53 +287,11 @@ the bytes back into an object.
<div class="codetabs">
<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-// Encoders for most common types are automatically provided by importing spark.implicits._
-val ds = Seq(1, 2, 3).toDS()
-ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)
-
-// Encoders are also created for case classes.
-case class Person(name: String, age: Long)
-val ds = Seq(Person("Andy", 32)).toDS()
-
-// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
-val path = "examples/src/main/resources/people.json"
-val people = spark.read.json(path).as[Person]
-
-{% endhighlight %}
-
+{% include_example create_ds scala/org/apache/spark/examples/sql/SparkSqlExample.scala %}
</div>
<div data-lang="java" markdown="1">
-
-{% highlight java %}
-SparkSession spark = ... // An existing SparkSession
-
-// Encoders for most common types are provided in class Encoders.
-Dataset<Integer> ds = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());
-ds.map(new MapFunction<Integer, Integer>() {
- @Override
- public Integer call(Integer value) throws Exception {
- return value + 1;
- }
-}, Encoders.INT()); // Returns: [2, 3, 4]
-
-Person person = new Person();
-person.setName("Andy");
-person.setAge(32);
-
-// Encoders are also created for Java beans.
-Dataset<Person> ds = spark.createDataset(
- Collections.singletonList(person),
- Encoders.bean(Person.class)
-);
-
-// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
-String path = "examples/src/main/resources/people.json";
-Dataset<Person> people = spark.read().json(path).as(Encoders.bean(Person.class));
-{% endhighlight %}
-
+{% include_example create_ds java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %}
</div>
</div>
@@ -470,38 +318,7 @@ reflection and become the names of the columns. Case classes can also be nested
types such as `Seq`s or `Array`s. This RDD can be implicitly converted to a DataFrame and then be
registered as a table. Tables can be used in subsequent SQL statements.
-{% highlight scala %}
-val spark: SparkSession // An existing SparkSession
-// this is used to implicitly convert an RDD to a DataFrame.
-import spark.implicits._
-
-// Define the schema using a case class.
-// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
-// you can use custom classes that implement the Product interface.
-case class Person(name: String, age: Int)
-
-// Create an RDD of Person objects and register it as a temporary view.
-val people = sc
- .textFile("examples/src/main/resources/people.txt")
- .map(_.split(","))
- .map(p => Person(p(0), p(1).trim.toInt))
- .toDF()
-people.createOrReplaceTempView("people")
-
-// SQL statements can be run by using the sql methods provided by spark.
-val teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
-
-// The columns of a row in the result can be accessed by field index:
-teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
-
-// or by field name:
-teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
-
-// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
-teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
-// Map("name" -> "Justin", "age" -> 19)
-{% endhighlight %}
-
+{% include_example schema_inferring scala/org/apache/spark/examples/sql/SparkSqlExample.scala %}
</div>
<div data-lang="java" markdown="1">
@@ -513,68 +330,7 @@ does not support JavaBeans that contain `Map` field(s). Nested JavaBeans and `Li
fields are supported though. You can create a JavaBean by creating a class that implements
Serializable and has getters and setters for all of its fields.
-{% highlight java %}
-
-public static class Person implements Serializable {
- private String name;
- private int age;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getAge() {
- return age;
- }
-
- public void setAge(int age) {
- this.age = age;
- }
-}
-
-{% endhighlight %}
-
-
-A schema can be applied to an existing RDD by calling `createDataFrame` and providing the Class object
-for the JavaBean.
-
-{% highlight java %}
-SparkSession spark = ...; // An existing SparkSession
-
-// Load a text file and convert each line to a JavaBean.
-JavaRDD<Person> people = spark.sparkContext.textFile("examples/src/main/resources/people.txt").map(
- new Function<String, Person>() {
- public Person call(String line) throws Exception {
- String[] parts = line.split(",");
-
- Person person = new Person();
- person.setName(parts[0]);
- person.setAge(Integer.parseInt(parts[1].trim()));
-
- return person;
- }
- });
-
-// Apply a schema to an RDD of JavaBeans and register it as a table.
-Dataset<Row> schemaPeople = spark.createDataFrame(people, Person.class);
-schemaPeople.createOrReplaceTempView("people");
-
-// SQL can be run over RDDs that have been registered as tables.
-Dataset<Row> teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
-
-// The columns of a row in the result can be accessed by ordinal.
-List<String> teenagerNames = teenagers.map(new MapFunction<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
-}).collectAsList();
-
-{% endhighlight %}
-
+{% include_example schema_inferring java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -628,43 +384,8 @@ a `DataFrame` can be created programmatically with three steps.
by `SparkSession`.
For example:
-{% highlight scala %}
-val spark: SparkSession // An existing SparkSession
-
-// Create an RDD
-val people = sc.textFile("examples/src/main/resources/people.txt")
-
-// The schema is encoded in a string
-val schemaString = "name age"
-
-// Import Row.
-import org.apache.spark.sql.Row;
-
-// Import Spark SQL data types
-import org.apache.spark.sql.types.{StructType, StructField, StringType};
-
-// Generate the schema based on the string of schema
-val schema = StructType(schemaString.split(" ").map { fieldName =>
- StructField(fieldName, StringType, true)
-})
-
-// Convert records of the RDD (people) to Rows.
-val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
-
-// Apply the schema to the RDD.
-val peopleDataFrame = spark.createDataFrame(rowRDD, schema)
-
-// Creates a temporary view using the DataFrame.
-peopleDataFrame.createOrReplaceTempView("people")
-
-// SQL statements can be run by using the sql methods provided by spark.
-val results = spark.sql("SELECT name FROM people")
-
-// The columns of a row in the result can be accessed by field index or by field name.
-results.map(t => "Name: " + t(0)).collect().foreach(println)
-{% endhighlight %}
-
+{% include_example programmatic_schema scala/org/apache/spark/examples/sql/SparkSqlExample.scala %}
</div>
<div data-lang="java" markdown="1">
@@ -681,62 +402,8 @@ a `Dataset<Row>` can be created programmatically with three steps.
by `SparkSession`.
For example:
-{% highlight java %}
-import org.apache.spark.api.java.function.Function;
-// Import factory methods provided by DataTypes.
-import org.apache.spark.sql.types.DataTypes;
-// Import StructType and StructField
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.types.StructField;
-// Import Row.
-import org.apache.spark.sql.Row;
-// Import RowFactory.
-import org.apache.spark.sql.RowFactory;
-
-SparkSession spark = ...; // An existing SparkSession.
-JavaSparkContext sc = spark.sparkContext
-
-// Load a text file and convert each line to a JavaBean.
-JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");
-
-// The schema is encoded in a string
-String schemaString = "name age";
-
-// Generate the schema based on the string of schema
-List<StructField> fields = new ArrayList<>();
-for (String fieldName: schemaString.split(" ")) {
- fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
-}
-StructType schema = DataTypes.createStructType(fields);
-
-// Convert records of the RDD (people) to Rows.
-JavaRDD<Row> rowRDD = people.map(
- new Function<String, Row>() {
- public Row call(String record) throws Exception {
- String[] fields = record.split(",");
- return RowFactory.create(fields[0], fields[1].trim());
- }
- });
-
-// Apply the schema to the RDD.
-Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
-
-// Creates a temporary view using the DataFrame.
-peopleDataFrame.createOrReplaceTempView("people");
-
-// SQL can be run over a temporary view created using DataFrames.
-Dataset<Row> results = spark.sql("SELECT name FROM people");
-
-// The results of SQL queries are DataFrames and support all the normal RDD operations.
-// The columns of a row in the result can be accessed by ordinal.
-List<String> names = results.javaRDD().map(new Function<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
-}).collect();
-
-{% endhighlight %}
+{% include_example programmatic_schema java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -805,23 +472,11 @@ In the simplest form, the default data source (`parquet` unless otherwise config
<div class="codetabs">
<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-val df = spark.read.load("examples/src/main/resources/users.parquet")
-df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
-{% endhighlight %}
-
+{% include_example generic_load_save_functions scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
-
-{% highlight java %}
-
-Dataset<Row> df = spark.read().load("examples/src/main/resources/users.parquet");
-df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
-
-{% endhighlight %}
-
+{% include_example generic_load_save_functions java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -852,23 +507,11 @@ using this syntax.
<div class="codetabs">
<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-val df = spark.read.format("json").load("examples/src/main/resources/people.json")
-df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
-{% endhighlight %}
-
+{% include_example manual_load_options scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
-
-{% highlight java %}
-
-Dataset<Row> df = spark.read().format("json").load("examples/src/main/resources/people.json");
-df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
-
-{% endhighlight %}
-
+{% include_example manual_load_options java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -895,18 +538,11 @@ file directly with SQL.
<div class="codetabs">
<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-val df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
-{% endhighlight %}
-
+{% include_example direct_sql scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
-
-{% highlight java %}
-Dataset<Row> df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
-{% endhighlight %}
+{% include_example direct_sql java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -997,53 +633,11 @@ Using the data from the above example:
<div class="codetabs">
<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-// spark from the previous example is used in this example.
-// This is used to implicitly convert an RDD to a DataFrame.
-import spark.implicits._
-
-val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
-
-// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
-people.write.parquet("people.parquet")
-
-// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
-// The result of loading a Parquet file is also a DataFrame.
-val parquetFile = spark.read.parquet("people.parquet")
-
-// Parquet files can also be used to create a temporary view and then used in SQL statements.
-parquetFile.createOrReplaceTempView("parquetFile")
-val teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
-teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
-{% endhighlight %}
-
+{% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
-
-{% highlight java %}
-// spark from the previous example is used in this example.
-
-Dataset<Row> schemaPeople = ... // The DataFrame from the previous example.
-
-// DataFrames can be saved as Parquet files, maintaining the schema information.
-schemaPeople.write().parquet("people.parquet");
-
-// Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
-// The result of loading a parquet file is also a DataFrame.
-Dataset<Row> parquetFile = spark.read().parquet("people.parquet");
-
-// Parquet files can also be used to create a temporary view and then used in SQL statements.
-parquetFile.createOrReplaceTempView("parquetFile");
-Dataset<Row> teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
-List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
-}).collect();
-{% endhighlight %}
-
+{% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -1172,34 +766,11 @@ turned it off by default starting from 1.5.0. You may enable it by
<div class="codetabs">
<div data-lang="scala" markdown="1">
+{% include_example schema_merging scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %}
+</div>
-{% highlight scala %}
-// spark from the previous example is used in this example.
-// This is used to implicitly convert an RDD to a DataFrame.
-import spark.implicits._
-
-// Create a simple DataFrame, stored into a partition directory
-val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
-df1.write.parquet("data/test_table/key=1")
-
-// Create another DataFrame in a new partition directory,
-// adding a new column and dropping an existing column
-val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
-df2.write.parquet("data/test_table/key=2")
-
-// Read the partitioned table
-val df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table")
-df3.printSchema()
-
-// The final schema consists of all 3 columns in the Parquet files together
-// with the partitioning column appeared in the partition directory paths.
-// root
-// |-- single: int (nullable = true)
-// |-- double: int (nullable = true)
-// |-- triple: int (nullable = true)
-// |-- key : int (nullable = true)
-{% endhighlight %}
-
+<div data-lang="java" markdown="1">
+{% include_example schema_merging java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -1280,8 +851,8 @@ metadata.
<div data-lang="scala" markdown="1">
{% highlight scala %}
-// spark is an existing HiveContext
-spark.refreshTable("my_table")
+// spark is an existing SparkSession
+spark.catalog.refreshTable("my_table")
{% endhighlight %}
</div>
@@ -1289,8 +860,8 @@ spark.refreshTable("my_table")
<div data-lang="java" markdown="1">
{% highlight java %}
-// spark is an existing HiveContext
-spark.refreshTable("my_table")
+// spark is an existing SparkSession
+spark.catalog().refreshTable("my_table");
{% endhighlight %}
</div>
@@ -1402,33 +973,7 @@ Note that the file that is offered as _a json file_ is not a typical JSON file.
line must contain a separate, self-contained valid JSON object. As a consequence,
a regular multi-line JSON file will most often fail.
-{% highlight scala %}
-val spark: SparkSession // An existing SparkSession
-
-// A JSON dataset is pointed to by path.
-// The path can be either a single text file or a directory storing text files.
-val path = "examples/src/main/resources/people.json"
-val people = spark.read.json(path)
-
-// The inferred schema can be visualized using the printSchema() method.
-people.printSchema()
-// root
-// |-- age: long (nullable = true)
-// |-- name: string (nullable = true)
-
-// Creates a temporary view using the DataFrame
-people.createOrReplaceTempView("people")
-
-// SQL statements can be run by using the sql methods provided by spark.
-val teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
-
-// Alternatively, a DataFrame can be created for a JSON dataset represented by
-// an RDD[String] storing one JSON object per string.
-val anotherPeopleRDD = sc.parallelize(
- """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
-val anotherPeople = spark.read.json(anotherPeopleRDD)
-{% endhighlight %}
-
+{% include_example json_dataset scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
@@ -1440,33 +985,7 @@ Note that the file that is offered as _a json file_ is not a typical JSON file.
line must contain a separate, self-contained valid JSON object. As a consequence,
a regular multi-line JSON file will most often fail.
-{% highlight java %}
-// sc is an existing JavaSparkContext.
-SparkSession spark = new org.apache.spark.sql.SparkSession(sc);
-
-// A JSON dataset is pointed to by path.
-// The path can be either a single text file or a directory storing text files.
-Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
-
-// The inferred schema can be visualized using the printSchema() method.
-people.printSchema();
-// root
-// |-- age: long (nullable = true)
-// |-- name: string (nullable = true)
-
-// Creates a temporary view using the DataFrame
-people.createOrReplaceTempView("people");
-
-// SQL statements can be run by using the sql methods provided by spark.
-Dataset<Row> teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
-
-// Alternatively, a DataFrame can be created for a JSON dataset represented by
-// an RDD[String] storing one JSON object per string.
-List<String> jsonData = Arrays.asList(
- "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
-JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
-Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
-{% endhighlight %}
+{% include_example json_dataset java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -1561,18 +1080,7 @@ the `hive.metastore.warehouse.dir` property in `hive-site.xml` is deprecated sin
Instead, use `spark.sql.warehouse.dir` to specify the default location of database in warehouse.
You may need to grant write privilege to the user who starts the spark application.
-{% highlight scala %}
-// warehouse_location points to the default location for managed databases and tables
-val conf = new SparkConf().setAppName("HiveFromSpark").set("spark.sql.warehouse.dir", warehouse_location)
-val spark = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
-
-spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
-spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
-
-// Queries are expressed in HiveQL
-spark.sql("FROM src SELECT key, value").collect().foreach(println)
-{% endhighlight %}
-
+{% include_example spark_hive scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala %}
</div>
<div data-lang="java" markdown="1">
@@ -1587,17 +1095,7 @@ the `hive.metastore.warehouse.dir` property in `hive-site.xml` is deprecated sin
Instead, use `spark.sql.warehouse.dir` to specify the default location of database in warehouse.
You may need to grant write privilege to the user who starts the spark application.
-{% highlight java %}
-SparkSession spark = SparkSession.builder().appName("JavaSparkSQL").getOrCreate();
-
-spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
-spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
-
-// Queries are expressed in HiveQL.
-List<Row> results = spark.sql("FROM src SELECT key, value").collectAsList();
-
-{% endhighlight %}
-
+{% include_example spark_hive java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java %}
</div>
<div data-lang="python" markdown="1">
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
deleted file mode 100644
index 7fc6c007b6..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.sql;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-// $example on:init_session$
-import org.apache.spark.sql.SparkSession;
-// $example off:init_session$
-
-public class JavaSparkSQL {
- public static class Person implements Serializable {
- private String name;
- private int age;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getAge() {
- return age;
- }
-
- public void setAge(int age) {
- this.age = age;
- }
- }
-
- public static void main(String[] args) throws Exception {
- // $example on:init_session$
- SparkSession spark = SparkSession
- .builder()
- .appName("JavaSparkSQL")
- .config("spark.some.config.option", "some-value")
- .getOrCreate();
- // $example off:init_session$
-
- System.out.println("=== Data source: RDD ===");
- // Load a text file and convert each line to a Java Bean.
- String file = "examples/src/main/resources/people.txt";
- JavaRDD<Person> people = spark.read().textFile(file).javaRDD().map(
- new Function<String, Person>() {
- @Override
- public Person call(String line) {
- String[] parts = line.split(",");
-
- Person person = new Person();
- person.setName(parts[0]);
- person.setAge(Integer.parseInt(parts[1].trim()));
-
- return person;
- }
- });
-
- // Apply a schema to an RDD of Java Beans and create a temporary view
- Dataset<Row> schemaPeople = spark.createDataFrame(people, Person.class);
- schemaPeople.createOrReplaceTempView("people");
-
- // SQL can be run over RDDs which backs a temporary view.
- Dataset<Row> teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
-
- // The results of SQL queries are DataFrames and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
- @Override
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- for (String name: teenagerNames) {
- System.out.println(name);
- }
-
- System.out.println("=== Data source: Parquet File ===");
- // DataFrames can be saved as parquet files, maintaining the schema information.
- schemaPeople.write().parquet("people.parquet");
-
- // Read in the parquet file created above.
- // Parquet files are self-describing so the schema is preserved.
- // The result of loading a parquet file is also a DataFrame.
- Dataset<Row> parquetFile = spark.read().parquet("people.parquet");
-
- // A temporary view can be created by using Parquet files and then used in SQL statements.
- parquetFile.createOrReplaceTempView("parquetFile");
- Dataset<Row> teenagers2 =
- spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
- teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
- @Override
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- for (String name: teenagerNames) {
- System.out.println(name);
- }
-
- System.out.println("=== Data source: JSON Dataset ===");
- // A JSON dataset is pointed by path.
- // The path can be either a single text file or a directory storing text files.
- String path = "examples/src/main/resources/people.json";
- // Create a DataFrame from the file(s) pointed by path
- Dataset<Row> peopleFromJsonFile = spark.read().json(path);
-
- // Because the schema of a JSON dataset is automatically inferred, to write queries,
- // it is better to take a look at what is the schema.
- peopleFromJsonFile.printSchema();
- // The schema of people is ...
- // root
- // |-- age: IntegerType
- // |-- name: StringType
-
- // Creates a temporary view using the DataFrame
- peopleFromJsonFile.createOrReplaceTempView("people");
-
- // SQL statements can be run by using the sql methods provided by `spark`
- Dataset<Row> teenagers3 = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
-
- // The results of SQL queries are DataFrame and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
- @Override
- public String call(Row row) { return "Name: " + row.getString(0); }
- }).collect();
- for (String name: teenagerNames) {
- System.out.println(name);
- }
-
- // Alternatively, a DataFrame can be created for a JSON dataset represented by
- // a RDD[String] storing one JSON object per string.
- List<String> jsonData = Arrays.asList(
- "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
- JavaRDD<String> anotherPeopleRDD = spark
- .createDataFrame(jsonData, String.class).toJSON().javaRDD();
- Dataset<Row> peopleFromJsonRDD = spark.read().json(anotherPeopleRDD);
-
- // Take a look at the schema of this new DataFrame.
- peopleFromJsonRDD.printSchema();
- // The schema of anotherPeople is ...
- // root
- // |-- address: StructType
- // | |-- city: StringType
- // | |-- state: StringType
- // |-- name: StringType
-
- peopleFromJsonRDD.createOrReplaceTempView("people2");
-
- Dataset<Row> peopleWithCity = spark.sql("SELECT name, address.city FROM people2");
- List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
- @Override
- public String call(Row row) {
- return "Name: " + row.getString(0) + ", City: " + row.getString(1);
- }
- }).collect();
- for (String name: nameAndCity) {
- System.out.println(name);
- }
-
- spark.stop();
- }
-}
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java
new file mode 100644
index 0000000000..586d6e3a3e
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql;
+
+// $example on:programmatic_schema$
+import java.util.ArrayList;
+import java.util.List;
+// $example off:programmatic_schema$
+// $example on:create_ds$
+import java.util.Arrays;
+import java.util.Collections;
+import java.io.Serializable;
+// $example off:create_ds$
+
+// $example on:schema_inferring$
+// $example on:programmatic_schema$
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+// $example off:programmatic_schema$
+// $example on:create_ds$
+import org.apache.spark.api.java.function.MapFunction;
+// $example on:create_df$
+// $example on:run_sql$
+// $example on:programmatic_schema$
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+// $example off:programmatic_schema$
+// $example off:create_df$
+// $example off:run_sql$
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+// $example off:create_ds$
+// $example off:schema_inferring$
+import org.apache.spark.sql.RowFactory;
+// $example on:init_session$
+import org.apache.spark.sql.SparkSession;
+// $example off:init_session$
+// $example on:programmatic_schema$
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+// $example off:programmatic_schema$
+
+// $example on:untyped_ops$
+// col("...") is preferable to df.col("...")
+import static org.apache.spark.sql.functions.col;
+// $example off:untyped_ops$
+
+public class JavaSparkSqlExample {
+ // $example on:create_ds$
+ public static class Person implements Serializable {
+ private String name;
+ private int age;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+ }
+ // $example off:create_ds$
+
+ public static void main(String[] args) {
+ // $example on:init_session$
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Java Spark SQL Example")
+ .config("spark.some.config.option", "some-value")
+ .getOrCreate();
+ // $example off:init_session$
+
+ runBasicDataFrameExample(spark);
+ runDatasetCreationExample(spark);
+ runInferSchemaExample(spark);
+ runProgrammaticSchemaExample(spark);
+
+ spark.stop();
+ }
+
+ private static void runBasicDataFrameExample(SparkSession spark) {
+ // $example on:create_df$
+ Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
+
+ // Displays the content of the DataFrame to stdout
+ df.show();
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:create_df$
+
+ // $example on:untyped_ops$
+ // Print the schema in a tree format
+ df.printSchema();
+ // root
+ // |-- age: long (nullable = true)
+ // |-- name: string (nullable = true)
+
+ // Select only the "name" column
+ df.select("name").show();
+ // +-------+
+ // | name|
+ // +-------+
+ // |Michael|
+ // | Andy|
+ // | Justin|
+ // +-------+
+
+ // Select everybody, but increment the age by 1
+ df.select(col("name"), col("age").plus(1)).show();
+ // +-------+---------+
+ // | name|(age + 1)|
+ // +-------+---------+
+ // |Michael| null|
+ // | Andy| 31|
+ // | Justin| 20|
+ // +-------+---------+
+
+ // Select people older than 21
+ df.filter(col("age").gt(21)).show();
+ // +---+----+
+ // |age|name|
+ // +---+----+
+ // | 30|Andy|
+ // +---+----+
+
+ // Count people by age
+ df.groupBy("age").count().show();
+ // +----+-----+
+ // | age|count|
+ // +----+-----+
+ // | 19| 1|
+ // |null| 1|
+ // | 30| 1|
+ // +----+-----+
+ // $example off:untyped_ops$
+
+ // $example on:run_sql$
+ // Register the DataFrame as a SQL temporary view
+ df.createOrReplaceTempView("people");
+
+ Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
+ sqlDF.show();
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:run_sql$
+ }
+
+ private static void runDatasetCreationExample(SparkSession spark) {
+ // $example on:create_ds$
+ // Create an instance of a Bean class
+ Person person = new Person();
+ person.setName("Andy");
+ person.setAge(32);
+
+ // Encoders are created for Java beans
+ Encoder<Person> personEncoder = Encoders.bean(Person.class);
+ Dataset<Person> javaBeanDS = spark.createDataset(
+ Collections.singletonList(person),
+ personEncoder
+ );
+ javaBeanDS.show();
+ // +---+----+
+ // |age|name|
+ // +---+----+
+ // | 32|Andy|
+ // +---+----+
+
+ // Encoders for most common types are provided in class Encoders
+ Encoder<Integer> integerEncoder = Encoders.INT();
+ Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
+ Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
+ @Override
+ public Integer call(Integer value) throws Exception {
+ return value + 1;
+ }
+ }, integerEncoder);
+ transformedDS.collect(); // Returns [2, 3, 4]
+
+ // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
+ String path = "examples/src/main/resources/people.json";
+ Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
+ peopleDS.show();
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:create_ds$
+ }
+
+ private static void runInferSchemaExample(SparkSession spark) {
+ // $example on:schema_inferring$
+ // Create an RDD of Person objects from a text file
+ JavaRDD<Person> peopleRDD = spark.read()
+ .textFile("examples/src/main/resources/people.txt")
+ .javaRDD()
+ .map(new Function<String, Person>() {
+ @Override
+ public Person call(String line) throws Exception {
+ String[] parts = line.split(",");
+ Person person = new Person();
+ person.setName(parts[0]);
+ person.setAge(Integer.parseInt(parts[1].trim()));
+ return person;
+ }
+ });
+
+ // Apply a schema to an RDD of JavaBeans to get a DataFrame
+ Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
+ // Register the DataFrame as a temporary view
+ peopleDF.createOrReplaceTempView("people");
+
+ // SQL statements can be run by using the sql methods provided by spark
+ Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
+
+ // The columns of a row in the result can be accessed by field index
+ Encoder<String> stringEncoder = Encoders.STRING();
+ Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() {
+ @Override
+ public String call(Row row) throws Exception {
+ return "Name: " + row.getString(0);
+ }
+ }, stringEncoder);
+ teenagerNamesByIndexDF.show();
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+
+ // or by field name
+ Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() {
+ @Override
+ public String call(Row row) throws Exception {
+ return "Name: " + row.<String>getAs("name");
+ }
+ }, stringEncoder);
+ teenagerNamesByFieldDF.show();
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+ // $example off:schema_inferring$
+ }
+
+ private static void runProgrammaticSchemaExample(SparkSession spark) {
+ // $example on:programmatic_schema$
+ // Create an RDD
+ JavaRDD<String> peopleRDD = spark.sparkContext()
+ .textFile("examples/src/main/resources/people.txt", 1)
+ .toJavaRDD();
+
+ // The schema is encoded in a string
+ String schemaString = "name age";
+
+ // Generate the schema based on the string of schema
+ List<StructField> fields = new ArrayList<>();
+ for (String fieldName : schemaString.split(" ")) {
+ StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
+ fields.add(field);
+ }
+ StructType schema = DataTypes.createStructType(fields);
+
+ // Convert records of the RDD (people) to Rows
+ JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
+ @Override
+ public Row call(String record) throws Exception {
+ String[] attributes = record.split(",");
+ return RowFactory.create(attributes[0], attributes[1].trim());
+ }
+ });
+
+ // Apply the schema to the RDD
+ Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
+
+ // Creates a temporary view using the DataFrame
+ peopleDataFrame.createOrReplaceTempView("people");
+
+ // SQL can be run over a temporary view created using DataFrames
+ Dataset<Row> results = spark.sql("SELECT name FROM people");
+
+ // The results of SQL queries are DataFrames and support all the normal RDD operations
+ // The columns of a row in the result can be accessed by field index or by field name
+ Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
+ @Override
+ public String call(Row row) throws Exception {
+ return "Name: " + row.getString(0);
+ }
+ }, Encoders.STRING());
+ namesDS.show();
+ // +-------------+
+ // | value|
+ // +-------------+
+ // |Name: Michael|
+ // | Name: Andy|
+ // | Name: Justin|
+ // +-------------+
+ // $example off:programmatic_schema$
+ }
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java
new file mode 100644
index 0000000000..4db5e1b0af
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql;
+
+// $example on:schema_merging$
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+// $example off:schema_merging$
+
+// $example on:basic_parquet_example$
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Encoders;
+// import org.apache.spark.sql.Encoders;
+// $example on:schema_merging$
+// $example on:json_dataset$
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+// $example off:json_dataset$
+// $example off:schema_merging$
+// $example off:basic_parquet_example$
+import org.apache.spark.sql.SparkSession;
+
+public class JavaSqlDataSourceExample {
+
+ // $example on:schema_merging$
+ public static class Square implements Serializable {
+ private int value;
+ private int square;
+
+ // Getters and setters...
+ // $example off:schema_merging$
+ public int getValue() {
+ return value;
+ }
+
+ public void setValue(int value) {
+ this.value = value;
+ }
+
+ public int getSquare() {
+ return square;
+ }
+
+ public void setSquare(int square) {
+ this.square = square;
+ }
+ // $example on:schema_merging$
+ }
+ // $example off:schema_merging$
+
+ // $example on:schema_merging$
+ public static class Cube implements Serializable {
+ private int value;
+ private int cube;
+
+ // Getters and setters...
+ // $example off:schema_merging$
+ public int getValue() {
+ return value;
+ }
+
+ public void setValue(int value) {
+ this.value = value;
+ }
+
+ public int getCube() {
+ return cube;
+ }
+
+ public void setCube(int cube) {
+ this.cube = cube;
+ }
+ // $example on:schema_merging$
+ }
+ // $example off:schema_merging$
+
+ public static void main(String[] args) {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Java Spark SQL Data Sources Example")
+ .config("spark.some.config.option", "some-value")
+ .getOrCreate();
+
+ runBasicDataSourceExample(spark);
+ runBasicParquetExample(spark);
+ runParquetSchemaMergingExample(spark);
+ runJsonDatasetExample(spark);
+
+ spark.stop();
+ }
+
+ private static void runBasicDataSourceExample(SparkSession spark) {
+ // $example on:generic_load_save_functions$
+ Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
+ usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
+ // $example off:generic_load_save_functions$
+ // $example on:manual_load_options$
+ Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json");
+ peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
+ // $example off:manual_load_options$
+ // $example on:direct_sql$
+ Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
+ // $example off:direct_sql$
+ }
+
+ private static void runBasicParquetExample(SparkSession spark) {
+ // $example on:basic_parquet_example$
+ Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
+
+ // DataFrames can be saved as Parquet files, maintaining the schema information
+ peopleDF.write().parquet("people.parquet");
+
+ // Read in the Parquet file created above.
+ // Parquet files are self-describing so the schema is preserved
+ // The result of loading a parquet file is also a DataFrame
+ Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
+
+ // Parquet files can also be used to create a temporary view and then used in SQL statements
+ parquetFileDF.createOrReplaceTempView("parquetFile");
+ Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
+ Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
+ public String call(Row row) {
+ return "Name: " + row.getString(0);
+ }
+ }, Encoders.STRING());
+ namesDS.show();
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+ // $example off:basic_parquet_example$
+ }
+
+ private static void runParquetSchemaMergingExample(SparkSession spark) {
+ // $example on:schema_merging$
+ List<Square> squares = new ArrayList<>();
+ for (int value = 1; value <= 5; value++) {
+ Square square = new Square();
+ square.setValue(value);
+ square.setSquare(value * value);
+ squares.add(square);
+ }
+
+ // Create a simple DataFrame, store into a partition directory
+ Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
+ squaresDF.write().parquet("data/test_table/key=1");
+
+ List<Cube> cubes = new ArrayList<>();
+ for (int value = 6; value <= 10; value++) {
+ Cube cube = new Cube();
+ cube.setValue(value);
+ cube.setCube(value * value * value);
+ cubes.add(cube);
+ }
+
+ // Create another DataFrame in a new partition directory,
+ // adding a new column and dropping an existing column
+ Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
+ cubesDF.write().parquet("data/test_table/key=2");
+
+ // Read the partitioned table
+ Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
+ mergedDF.printSchema();
+
+ // The final schema consists of all 3 columns in the Parquet files together
+ // with the partitioning column appeared in the partition directory paths
+ // root
+ // |-- value: int (nullable = true)
+ // |-- square: int (nullable = true)
+ // |-- cube: int (nullable = true)
+ // |-- key : int (nullable = true)
+ // $example off:schema_merging$
+ }
+
+ private static void runJsonDatasetExample(SparkSession spark) {
+ // $example on:json_dataset$
+ // A JSON dataset is pointed to by path.
+ // The path can be either a single text file or a directory storing text files
+ Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
+
+ // The inferred schema can be visualized using the printSchema() method
+ people.printSchema();
+ // root
+ // |-- age: long (nullable = true)
+ // |-- name: string (nullable = true)
+
+ // Creates a temporary view using the DataFrame
+ people.createOrReplaceTempView("people");
+
+ // SQL statements can be run by using the sql methods provided by spark
+ Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
+ namesDF.show();
+ // +------+
+ // | name|
+ // +------+
+ // |Justin|
+ // +------+
+ // $example off:json_dataset$
+ }
+
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
new file mode 100644
index 0000000000..493d759a91
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.hive;
+
+// $example on:spark_hive$
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+// $example off:spark_hive$
+
+public class JavaSparkHiveExample {
+
+ // $example on:spark_hive$
+ public static class Record implements Serializable {
+ private int key;
+ private String value;
+
+ public int getKey() {
+ return key;
+ }
+
+ public void setKey(int key) {
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+ }
+ // $example off:spark_hive$
+
+ public static void main(String[] args) {
+ // $example on:spark_hive$
+ // warehouseLocation points to the default location for managed databases and tables
+ String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Java Spark Hive Example")
+ .config("spark.sql.warehouse.dir", warehouseLocation)
+ .enableHiveSupport()
+ .getOrCreate();
+
+ spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
+ spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
+
+ // Queries are expressed in HiveQL
+ spark.sql("SELECT * FROM src").show();
+ // +---+-------+
+ // |key| value|
+ // +---+-------+
+ // |238|val_238|
+ // | 86| val_86|
+ // |311|val_311|
+ // ...
+
+ // Aggregation queries are also supported.
+ spark.sql("SELECT COUNT(*) FROM src").show();
+ // +--------+
+ // |count(1)|
+ // +--------+
+ // | 500 |
+ // +--------+
+
+ // The results of SQL queries are themselves DataFrames and support all normal functions.
+ Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
+
+ // The items in DaraFrames are of type Row, which lets you to access each column by ordinal.
+ Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() {
+ @Override
+ public String call(Row row) throws Exception {
+ return "Key: " + row.get(0) + ", Value: " + row.get(1);
+ }
+ }, Encoders.STRING());
+ stringsDS.show();
+ // +--------------------+
+ // | value|
+ // +--------------------+
+ // |Key: 0, Value: val_0|
+ // |Key: 0, Value: val_0|
+ // |Key: 0, Value: val_0|
+ // ...
+
+ // You can also use DataFrames to create temporary views within a HiveContext.
+ List<Record> records = new ArrayList<>();
+ for (int key = 1; key < 100; key++) {
+ Record record = new Record();
+ record.setKey(key);
+ record.setValue("val_" + key);
+ records.add(record);
+ }
+ Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
+ recordsDF.createOrReplaceTempView("records");
+
+ // Queries can then join DataFrames data with data stored in Hive.
+ spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
+ // +---+------+---+------+
+ // |key| value|key| value|
+ // +---+------+---+------+
+ // | 2| val_2| 2| val_2|
+ // | 2| val_2| 2| val_2|
+ // | 4| val_4| 4| val_4|
+ // ...
+ // $example off:spark_hive$
+
+ spark.stop();
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala
new file mode 100644
index 0000000000..cf3f864267
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+// $example on:schema_inferring$
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.Encoder
+// $example off:schema_inferring$
+import org.apache.spark.sql.Row
+// $example on:init_session$
+import org.apache.spark.sql.SparkSession
+// $example off:init_session$
+// $example on:programmatic_schema$
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
+// $example off:programmatic_schema$
+
+object SparkSqlExample {
+
+ // $example on:create_ds$
+ // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
+ // you can use custom classes that implement the Product interface
+ case class Person(name: String, age: Long)
+ // $example off:create_ds$
+
+ def main(args: Array[String]) {
+ // $example on:init_session$
+ val spark = SparkSession
+ .builder()
+ .appName("Spark SQL Example")
+ .config("spark.some.config.option", "some-value")
+ .getOrCreate()
+
+ // For implicit conversions like converting RDDs to DataFrames
+ import spark.implicits._
+ // $example off:init_session$
+
+ runBasicDataFrameExample(spark)
+ runDatasetCreationExample(spark)
+ runInferSchemaExample(spark)
+ runProgrammaticSchemaExample(spark)
+
+ spark.stop()
+ }
+
+ private def runBasicDataFrameExample(spark: SparkSession): Unit = {
+ // $example on:create_df$
+ val df = spark.read.json("examples/src/main/resources/people.json")
+
+ // Displays the content of the DataFrame to stdout
+ df.show()
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:create_df$
+
+ // $example on:untyped_ops$
+ // This import is needed to use the $-notation
+ import spark.implicits._
+ // Print the schema in a tree format
+ df.printSchema()
+ // root
+ // |-- age: long (nullable = true)
+ // |-- name: string (nullable = true)
+
+ // Select only the "name" column
+ df.select("name").show()
+ // +-------+
+ // | name|
+ // +-------+
+ // |Michael|
+ // | Andy|
+ // | Justin|
+ // +-------+
+
+ // Select everybody, but increment the age by 1
+ df.select($"name", $"age" + 1).show()
+ // +-------+---------+
+ // | name|(age + 1)|
+ // +-------+---------+
+ // |Michael| null|
+ // | Andy| 31|
+ // | Justin| 20|
+ // +-------+---------+
+
+ // Select people older than 21
+ df.filter($"age" > 21).show()
+ // +---+----+
+ // |age|name|
+ // +---+----+
+ // | 30|Andy|
+ // +---+----+
+
+ // Count people by age
+ df.groupBy("age").count().show()
+ // +----+-----+
+ // | age|count|
+ // +----+-----+
+ // | 19| 1|
+ // |null| 1|
+ // | 30| 1|
+ // +----+-----+
+ // $example off:untyped_ops$
+
+ // $example on:run_sql$
+ // Register the DataFrame as a SQL temporary view
+ df.createOrReplaceTempView("people")
+
+ val sqlDF = spark.sql("SELECT * FROM people")
+ sqlDF.show()
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:run_sql$
+ }
+
+ private def runDatasetCreationExample(spark: SparkSession): Unit = {
+ import spark.implicits._
+ // $example on:create_ds$
+ // Encoders are created for case classes
+ val caseClassDS = Seq(Person("Andy", 32)).toDS()
+ caseClassDS.show()
+ // +----+---+
+ // |name|age|
+ // +----+---+
+ // |Andy| 32|
+ // +----+---+
+
+ // Encoders for most common types are automatically provided by importing spark.implicits._
+ val primitiveDS = Seq(1, 2, 3).toDS()
+ primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
+
+ // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
+ val path = "examples/src/main/resources/people.json"
+ val peopleDS = spark.read.json(path).as[Person]
+ peopleDS.show()
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:create_ds$
+ }
+
+ private def runInferSchemaExample(spark: SparkSession): Unit = {
+ // $example on:schema_inferring$
+ // For implicit conversions from RDDs to DataFrames
+ import spark.implicits._
+
+ // Create an RDD of Person objects from a text file, convert it to a Dataframe
+ val peopleDF = spark.sparkContext
+ .textFile("examples/src/main/resources/people.txt")
+ .map(_.split(","))
+ .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
+ .toDF()
+ // Register the DataFrame as a temporary view
+ peopleDF.createOrReplaceTempView("people")
+
+ // SQL statements can be run by using the sql methods provided by Spark
+ val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
+
+ // The columns of a row in the result can be accessed by field index
+ teenagersDF.map(teenager => "Name: " + teenager(0)).show()
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+
+ // or by field name
+ teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+
+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
+ implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
+ // Primitive types and case classes can be also defined as
+ implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()
+
+ // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
+ teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
+ // Array(Map("name" -> "Justin", "age" -> 19))
+ // $example off:schema_inferring$
+ }
+
+ private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
+ import spark.implicits._
+ // $example on:programmatic_schema$
+ // Create an RDD
+ val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
+
+ // The schema is encoded in a string
+ val schemaString = "name age"
+
+ // Generate the schema based on the string of schema
+ val fields = schemaString.split(" ")
+ .map(fieldName => StructField(fieldName, StringType, nullable = true))
+ val schema = StructType(fields)
+
+ // Convert records of the RDD (people) to Rows
+ val rowRDD = peopleRDD
+ .map(_.split(","))
+ .map(attributes => Row(attributes(0), attributes(1).trim))
+
+ // Apply the schema to the RDD
+ val peopleDF = spark.createDataFrame(rowRDD, schema)
+
+ // Creates a temporary view using the DataFrame
+ peopleDF.createOrReplaceTempView("people")
+
+ // SQL can be run over a temporary view created using DataFrames
+ val results = spark.sql("SELECT name FROM people")
+
+ // The results of SQL queries are DataFrames and support all the normal RDD operations
+ // The columns of a row in the result can be accessed by field index or by field name
+ results.map(attributes => "Name: " + attributes(0)).show()
+ // +-------------+
+ // | value|
+ // +-------------+
+ // |Name: Michael|
+ // | Name: Andy|
+ // | Name: Justin|
+ // +-------------+
+ // $example off:programmatic_schema$
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala
new file mode 100644
index 0000000000..61dea6ad2c
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+import org.apache.spark.sql.SparkSession
+
+object SqlDataSourceExample {
+
+ case class Person(name: String, age: Long)
+
+ def main(args: Array[String]) {
+ val spark = SparkSession
+ .builder()
+ .appName("Spark SQL Data Soures Example")
+ .config("spark.some.config.option", "some-value")
+ .getOrCreate()
+
+ runBasicDataSourceExample(spark)
+ runBasicParquetExample(spark)
+ runParquetSchemaMergingExample(spark)
+ runJsonDatasetExample(spark)
+
+ spark.stop()
+ }
+
+ private def runBasicDataSourceExample(spark: SparkSession): Unit = {
+ // $example on:generic_load_save_functions$
+ val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
+ usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
+ // $example off:generic_load_save_functions$
+ // $example on:manual_load_options$
+ val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
+ peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
+ // $example off:manual_load_options$
+ // $example on:direct_sql$
+ val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
+ // $example off:direct_sql$
+ }
+
+ private def runBasicParquetExample(spark: SparkSession): Unit = {
+ // $example on:basic_parquet_example$
+ // Encoders for most common types are automatically provided by importing spark.implicits._
+ import spark.implicits._
+
+ val peopleDF = spark.read.json("examples/src/main/resources/people.json")
+
+ // DataFrames can be saved as Parquet files, maintaining the schema information
+ peopleDF.write.parquet("people.parquet")
+
+ // Read in the parquet file created above
+ // Parquet files are self-describing so the schema is preserved
+ // The result of loading a Parquet file is also a DataFrame
+ val parquetFileDF = spark.read.parquet("people.parquet")
+
+ // Parquet files can also be used to create a temporary view and then used in SQL statements
+ parquetFileDF.createOrReplaceTempView("parquetFile")
+ val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
+ namesDF.map(attributes => "Name: " + attributes(0)).show()
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+ // $example off:basic_parquet_example$
+ }
+
+ private def runParquetSchemaMergingExample(spark: SparkSession): Unit = {
+ // $example on:schema_merging$
+ // This is used to implicitly convert an RDD to a DataFrame.
+ import spark.implicits._
+
+ // Create a simple DataFrame, store into a partition directory
+ val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
+ squaresDF.write.parquet("data/test_table/key=1")
+
+ // Create another DataFrame in a new partition directory,
+ // adding a new column and dropping an existing column
+ val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
+ cubesDF.write.parquet("data/test_table/key=2")
+
+ // Read the partitioned table
+ val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
+ mergedDF.printSchema()
+
+ // The final schema consists of all 3 columns in the Parquet files together
+ // with the partitioning column appeared in the partition directory paths
+ // root
+ // |-- value: int (nullable = true)
+ // |-- square: int (nullable = true)
+ // |-- cube: int (nullable = true)
+ // |-- key : int (nullable = true)
+ // $example off:schema_merging$
+ }
+
+ private def runJsonDatasetExample(spark: SparkSession): Unit = {
+ // $example on:json_dataset$
+ // A JSON dataset is pointed to by path.
+ // The path can be either a single text file or a directory storing text files
+ val path = "examples/src/main/resources/people.json"
+ val peopleDF = spark.read.json(path)
+
+ // The inferred schema can be visualized using the printSchema() method
+ peopleDF.printSchema()
+ // root
+ // |-- age: long (nullable = true)
+ // |-- name: string (nullable = true)
+
+ // Creates a temporary view using the DataFrame
+ peopleDF.createOrReplaceTempView("people")
+
+ // SQL statements can be run by using the sql methods provided by spark
+ val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
+ teenagerNamesDF.show()
+ // +------+
+ // | name|
+ // +------+
+ // |Justin|
+ // +------+
+
+ // Alternatively, a DataFrame can be created for a JSON dataset represented by
+ // an RDD[String] storing one JSON object per string
+ val otherPeopleRDD = spark.sparkContext.makeRDD(
+ """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
+ val otherPeople = spark.read.json(otherPeopleRDD)
+ otherPeople.show()
+ // +---------------+----+
+ // | address|name|
+ // +---------------+----+
+ // |[Columbus,Ohio]| Yin|
+ // +---------------+----+
+ // $example off:json_dataset$
+ }
+
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
deleted file mode 100644
index 2343f98c8d..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.sql.hive
-
-import java.io.File
-
-import com.google.common.io.{ByteStreams, Files}
-
-import org.apache.spark.sql._
-
-object HiveFromSpark {
- case class Record(key: Int, value: String)
-
- // Copy kv1.txt file from classpath to temporary directory
- val kv1Stream = HiveFromSpark.getClass.getResourceAsStream("/kv1.txt")
- val kv1File = File.createTempFile("kv1", "txt")
- kv1File.deleteOnExit()
- ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File))
-
- def main(args: Array[String]) {
- // When working with Hive, one must instantiate `SparkSession` with Hive support, including
- // connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined
- // functions. Users who do not have an existing Hive deployment can still enable Hive support.
- // When not configured by the hive-site.xml, the context automatically creates `metastore_db`
- // in the current directory and creates a directory configured by `spark.sql.warehouse.dir`,
- // which defaults to the directory `spark-warehouse` in the current directory that the spark
- // application is started.
- val spark = SparkSession.builder
- .appName("HiveFromSpark")
- .enableHiveSupport()
- .getOrCreate()
-
- import spark.implicits._
- import spark.sql
-
- sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
- sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src")
-
- // Queries are expressed in HiveQL
- println("Result of 'SELECT *': ")
- sql("SELECT * FROM src").collect().foreach(println)
-
- // Aggregation queries are also supported.
- val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0)
- println(s"COUNT(*): $count")
-
- // The results of SQL queries are themselves RDDs and support all normal RDD functions. The
- // items in the RDD are of type Row, which allows you to access each column by ordinal.
- val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
-
- println("Result of RDD.map:")
- val rddAsStrings = rddFromSql.rdd.map {
- case Row(key: Int, value: String) => s"Key: $key, Value: $value"
- }
-
- // You can also use RDDs to create temporary views within a HiveContext.
- val rdd = spark.sparkContext.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
- rdd.toDF().createOrReplaceTempView("records")
-
- // Queries can then join RDD data with data stored in Hive.
- println("Result of SELECT *:")
- sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
-
- spark.stop()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
new file mode 100644
index 0000000000..e897c2d066
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.hive
+
+// $example on:spark_hive$
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+// $example off:spark_hive$
+
+object SparkHiveExample {
+
+ // $example on:spark_hive$
+ case class Record(key: Int, value: String)
+ // $example off:spark_hive$
+
+ def main(args: Array[String]) {
+ // When working with Hive, one must instantiate `SparkSession` with Hive support, including
+ // connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined
+ // functions. Users who do not have an existing Hive deployment can still enable Hive support.
+ // When not configured by the hive-site.xml, the context automatically creates `metastore_db`
+ // in the current directory and creates a directory configured by `spark.sql.warehouse.dir`,
+ // which defaults to the directory `spark-warehouse` in the current directory that the spark
+ // application is started.
+
+ // $example on:spark_hive$
+ // warehouseLocation points to the default location for managed databases and tables
+ val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
+
+ val spark = SparkSession
+ .builder()
+ .appName("Spark Hive Example")
+ .config("spark.sql.warehouse.dir", warehouseLocation)
+ .enableHiveSupport()
+ .getOrCreate()
+
+ import spark.implicits._
+ import spark.sql
+
+ sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+ sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
+
+ // Queries are expressed in HiveQL
+ sql("SELECT * FROM src").show()
+ // +---+-------+
+ // |key| value|
+ // +---+-------+
+ // |238|val_238|
+ // | 86| val_86|
+ // |311|val_311|
+ // ...
+
+ // Aggregation queries are also supported.
+ sql("SELECT COUNT(*) FROM src").show()
+ // +--------+
+ // |count(1)|
+ // +--------+
+ // | 500 |
+ // +--------+
+
+ // The results of SQL queries are themselves DataFrames and support all normal functions.
+ val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
+
+ // The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
+ val stringsDS = sqlDF.map {
+ case Row(key: Int, value: String) => s"Key: $key, Value: $value"
+ }
+ stringsDS.show()
+ // +--------------------+
+ // | value|
+ // +--------------------+
+ // |Key: 0, Value: val_0|
+ // |Key: 0, Value: val_0|
+ // |Key: 0, Value: val_0|
+ // ...
+
+ // You can also use DataFrames to create temporary views within a HiveContext.
+ val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
+ recordsDF.createOrReplaceTempView("records")
+
+ // Queries can then join DataFrame data with data stored in Hive.
+ sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
+ // +---+------+---+------+
+ // |key| value|key| value|
+ // +---+------+---+------+
+ // | 2| val_2| 2| val_2|
+ // | 2| val_2| 2| val_2|
+ // | 4| val_4| 4| val_4|
+ // ...
+ // $example off:spark_hive$
+
+ spark.stop()
+ }
+}