aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/sql-programming-guide.md229
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java2
-rw-r--r--examples/src/main/python/sql.py83
-rw-r--r--examples/src/main/python/sql/basic.py194
-rw-r--r--examples/src/main/python/sql/datasource.py154
-rw-r--r--examples/src/main/python/sql/hive.py96
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala4
7 files changed, 460 insertions, 302 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 3af935a952..ad123d7cea 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -79,7 +79,7 @@ The entry point into all functionality in Spark is the [`SparkSession`](api/java
The entry point into all functionality in Spark is the [`SparkSession`](api/python/pyspark.sql.html#pyspark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.builder`:
-{% include_example init_session python/sql.py %}
+{% include_example init_session python/sql/basic.py %}
</div>
<div data-lang="r" markdown="1">
@@ -123,14 +123,7 @@ from a Hive table, or from [Spark data sources](#data-sources).
As an example, the following creates a DataFrame based on the content of a JSON file:
-{% highlight python %}
-# spark is an existing SparkSession
-df = spark.read.json("examples/src/main/resources/people.json")
-
-# Displays the content of the DataFrame to stdout
-df.show()
-{% endhighlight %}
-
+{% include_example create_df python/sql/basic.py %}
</div>
<div data-lang="r" markdown="1">
@@ -178,53 +171,7 @@ interactive data exploration, users are highly encouraged to use the
latter form, which is future proof and won't break with column names that
are also attributes on the DataFrame class.
-{% highlight python %}
-# spark is an existing SparkSession
-
-# Create the DataFrame
-df = spark.read.json("examples/src/main/resources/people.json")
-
-# Show the content of the DataFrame
-df.show()
-## age name
-## null Michael
-## 30 Andy
-## 19 Justin
-
-# Print the schema in a tree format
-df.printSchema()
-## root
-## |-- age: long (nullable = true)
-## |-- name: string (nullable = true)
-
-# Select only the "name" column
-df.select("name").show()
-## name
-## Michael
-## Andy
-## Justin
-
-# Select everybody, but increment the age by 1
-df.select(df['name'], df['age'] + 1).show()
-## name (age + 1)
-## Michael null
-## Andy 31
-## Justin 20
-
-# Select people older than 21
-df.filter(df['age'] > 21).show()
-## age name
-## 30 Andy
-
-# Count people by age
-df.groupBy("age").count().show()
-## age count
-## null 1
-## 19 1
-## 30 1
-
-{% endhighlight %}
-
+{% include_example untyped_ops python/sql/basic.py %}
For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](api/python/pyspark.sql.html#pyspark.sql.DataFrame).
In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the [DataFrame Function Reference](api/python/pyspark.sql.html#module-pyspark.sql.functions).
@@ -261,10 +208,7 @@ The `sql` function on a `SparkSession` enables applications to run SQL queries p
<div data-lang="python" markdown="1">
The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`.
-{% highlight python %}
-# spark is an existing SparkSession
-df = spark.sql("SELECT * FROM table")
-{% endhighlight %}
+{% include_example run_sql python/sql/basic.py %}
</div>
<div data-lang="r" markdown="1">
@@ -339,29 +283,7 @@ Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the dataty
key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table,
and the types are inferred by sampling the whole datase, similar to the inference that is performed on JSON files.
-{% highlight python %}
-# spark is an existing SparkSession.
-from pyspark.sql import Row
-sc = spark.sparkContext
-
-# Load a text file and convert each line to a Row.
-lines = sc.textFile("examples/src/main/resources/people.txt")
-parts = lines.map(lambda l: l.split(","))
-people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
-
-# Infer the schema, and register the DataFrame as a table.
-schemaPeople = spark.createDataFrame(people)
-schemaPeople.createOrReplaceTempView("people")
-
-# SQL can be run over DataFrames that have been registered as a table.
-teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
-
-# The results of SQL queries are RDDs and support all the normal RDD operations.
-teenNames = teenagers.map(lambda p: "Name: " + p.name)
-for teenName in teenNames.collect():
- print(teenName)
-{% endhighlight %}
-
+{% include_example schema_inferring python/sql/basic.py %}
</div>
</div>
@@ -419,39 +341,8 @@ tuples or lists in the RDD created in the step 1.
3. Apply the schema to the RDD via `createDataFrame` method provided by `SparkSession`.
For example:
-{% highlight python %}
-# Import SparkSession and data types
-from pyspark.sql.types import *
-
-# spark is an existing SparkSession.
-sc = spark.sparkContext
-
-# Load a text file and convert each line to a tuple.
-lines = sc.textFile("examples/src/main/resources/people.txt")
-parts = lines.map(lambda l: l.split(","))
-people = parts.map(lambda p: (p[0], p[1].strip()))
-
-# The schema is encoded in a string.
-schemaString = "name age"
-
-fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
-schema = StructType(fields)
-
-# Apply the schema to the RDD.
-schemaPeople = spark.createDataFrame(people, schema)
-
-# Creates a temporary view using the DataFrame
-schemaPeople.createOrReplaceTempView("people")
-
-# SQL can be run over DataFrames that have been registered as a table.
-results = spark.sql("SELECT name FROM people")
-
-# The results of SQL queries are RDDs and support all the normal RDD operations.
-names = results.map(lambda p: "Name: " + p.name)
-for name in names.collect():
- print(name)
-{% endhighlight %}
+{% include_example programmatic_schema python/sql/basic.py %}
</div>
</div>
@@ -481,13 +372,7 @@ In the simplest form, the default data source (`parquet` unless otherwise config
<div data-lang="python" markdown="1">
-{% highlight python %}
-
-df = spark.read.load("examples/src/main/resources/users.parquet")
-df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
-
-{% endhighlight %}
-
+{% include_example generic_load_save_functions python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
@@ -516,13 +401,7 @@ using this syntax.
<div data-lang="python" markdown="1">
-{% highlight python %}
-
-df = spark.read.load("examples/src/main/resources/people.json", format="json")
-df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
-
-{% endhighlight %}
-
+{% include_example manual_load_options python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
@@ -547,10 +426,7 @@ file directly with SQL.
<div data-lang="python" markdown="1">
-{% highlight python %}
-df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
-{% endhighlight %}
-
+{% include_example direct_sql python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
@@ -642,26 +518,7 @@ Using the data from the above example:
<div data-lang="python" markdown="1">
-{% highlight python %}
-# spark from the previous example is used in this example.
-
-schemaPeople # The DataFrame from the previous example.
-
-# DataFrames can be saved as Parquet files, maintaining the schema information.
-schemaPeople.write.parquet("people.parquet")
-
-# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
-# The result of loading a parquet file is also a DataFrame.
-parquetFile = spark.read.parquet("people.parquet")
-
-# Parquet files can also be used to create a temporary view and then used in SQL statements.
-parquetFile.createOrReplaceTempView("parquetFile");
-teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
-teenNames = teenagers.map(lambda p: "Name: " + p.name)
-for teenName in teenNames.collect():
- print(teenName)
-{% endhighlight %}
-
+{% include_example basic_parquet_example python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
@@ -775,33 +632,7 @@ turned it off by default starting from 1.5.0. You may enable it by
<div data-lang="python" markdown="1">
-{% highlight python %}
-# spark from the previous example is used in this example.
-
-# Create a simple DataFrame, stored into a partition directory
-df1 = spark.createDataFrame(sc.parallelize(range(1, 6))\
- .map(lambda i: Row(single=i, double=i * 2)))
-df1.write.parquet("data/test_table/key=1")
-
-# Create another DataFrame in a new partition directory,
-# adding a new column and dropping an existing column
-df2 = spark.createDataFrame(sc.parallelize(range(6, 11))
- .map(lambda i: Row(single=i, triple=i * 3)))
-df2.write.parquet("data/test_table/key=2")
-
-# Read the partitioned table
-df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table")
-df3.printSchema()
-
-# The final schema consists of all 3 columns in the Parquet files together
-# with the partitioning column appeared in the partition directory paths.
-# root
-# |-- single: int (nullable = true)
-# |-- double: int (nullable = true)
-# |-- triple: int (nullable = true)
-# |-- key : int (nullable = true)
-{% endhighlight %}
-
+{% include_example schema_merging python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
@@ -996,31 +827,7 @@ Note that the file that is offered as _a json file_ is not a typical JSON file.
line must contain a separate, self-contained valid JSON object. As a consequence,
a regular multi-line JSON file will most often fail.
-{% highlight python %}
-# spark is an existing SparkSession.
-
-# A JSON dataset is pointed to by path.
-# The path can be either a single text file or a directory storing text files.
-people = spark.read.json("examples/src/main/resources/people.json")
-
-# The inferred schema can be visualized using the printSchema() method.
-people.printSchema()
-# root
-# |-- age: long (nullable = true)
-# |-- name: string (nullable = true)
-
-# Creates a temporary view using the DataFrame.
-people.createOrReplaceTempView("people")
-
-# SQL statements can be run by using the sql methods provided by `spark`.
-teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
-
-# Alternatively, a DataFrame can be created for a JSON dataset represented by
-# an RDD[String] storing one JSON object per string.
-anotherPeopleRDD = sc.parallelize([
- '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'])
-anotherPeople = spark.jsonRDD(anotherPeopleRDD)
-{% endhighlight %}
+{% include_example json_dataset python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
@@ -1110,17 +917,7 @@ the `hive.metastore.warehouse.dir` property in `hive-site.xml` is deprecated sin
Instead, use `spark.sql.warehouse.dir` to specify the default location of database in warehouse.
You may need to grant write privilege to the user who starts the spark application.
-{% highlight python %}
-# spark is an existing SparkSession
-
-spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
-spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
-
-# Queries can be expressed in HiveQL.
-results = spark.sql("FROM src SELECT key, value").collect()
-
-{% endhighlight %}
-
+{% include_example spark_hive python/sql/hive.py %}
</div>
<div data-lang="r" markdown="1">
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
index 493d759a91..76dd160d55 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
@@ -104,7 +104,7 @@ public class JavaSparkHiveExample {
// |Key: 0, Value: val_0|
// ...
- // You can also use DataFrames to create temporary views within a HiveContext.
+ // You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
diff --git a/examples/src/main/python/sql.py b/examples/src/main/python/sql.py
deleted file mode 100644
index ea11d2c4c7..0000000000
--- a/examples/src/main/python/sql.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-import os
-import sys
-
-# $example on:init_session$
-from pyspark.sql import SparkSession
-# $example off:init_session$
-from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
-
-
-if __name__ == "__main__":
- # $example on:init_session$
- spark = SparkSession\
- .builder\
- .appName("PythonSQL")\
- .config("spark.some.config.option", "some-value")\
- .getOrCreate()
- # $example off:init_session$
-
- # A list of Rows. Infer schema from the first row, create a DataFrame and print the schema
- rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
- some_df = spark.createDataFrame(rows)
- some_df.printSchema()
-
- # A list of tuples
- tuples = [("John", 19), ("Smith", 23), ("Sarah", 18)]
- # Schema with two fields - person_name and person_age
- schema = StructType([StructField("person_name", StringType(), False),
- StructField("person_age", IntegerType(), False)])
- # Create a DataFrame by applying the schema to the RDD and print the schema
- another_df = spark.createDataFrame(tuples, schema)
- another_df.printSchema()
- # root
- # |-- age: long (nullable = true)
- # |-- name: string (nullable = true)
-
- # A JSON dataset is pointed to by path.
- # The path can be either a single text file or a directory storing text files.
- if len(sys.argv) < 2:
- path = "file://" + \
- os.path.join(os.environ['SPARK_HOME'], "examples/src/main/resources/people.json")
- else:
- path = sys.argv[1]
- # Create a DataFrame from the file(s) pointed to by path
- people = spark.read.json(path)
- # root
- # |-- person_name: string (nullable = false)
- # |-- person_age: integer (nullable = false)
-
- # The inferred schema can be visualized using the printSchema() method.
- people.printSchema()
- # root
- # |-- age: long (nullable = true)
- # |-- name: string (nullable = true)
-
- # Creates a temporary view using the DataFrame.
- people.createOrReplaceTempView("people")
-
- # SQL statements can be run by using the sql methods provided by `spark`
- teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
-
- for each in teenagers.collect():
- print(each[0])
-
- spark.stop()
diff --git a/examples/src/main/python/sql/basic.py b/examples/src/main/python/sql/basic.py
new file mode 100644
index 0000000000..74f5009581
--- /dev/null
+++ b/examples/src/main/python/sql/basic.py
@@ -0,0 +1,194 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+# $example on:init_session$
+from pyspark.sql import SparkSession
+# $example off:init_session$
+
+# $example on:schema_inferring$
+from pyspark.sql import Row
+# $example off:schema_inferring$
+
+# $example on:programmatic_schema$
+# Import data types
+from pyspark.sql.types import *
+# $example off:programmatic_schema$
+
+"""
+A simple example demonstrating basic Spark SQL features.
+Run with:
+ ./bin/spark-submit examples/src/main/python/sql/basic.py
+"""
+
+
+def basic_df_example(spark):
+ # $example on:create_df$
+ # spark is an existing SparkSession
+ df = spark.read.json("examples/src/main/resources/people.json")
+ # Displays the content of the DataFrame to stdout
+ df.show()
+ # +----+-------+
+ # | age| name|
+ # +----+-------+
+ # |null|Michael|
+ # | 30| Andy|
+ # | 19| Justin|
+ # +----+-------+
+ # $example off:create_df$
+
+ # $example on:untyped_ops$
+ # spark, df are from the previous example
+ # Print the schema in a tree format
+ df.printSchema()
+ # root
+ # |-- age: long (nullable = true)
+ # |-- name: string (nullable = true)
+
+ # Select only the "name" column
+ df.select("name").show()
+ # +-------+
+ # | name|
+ # +-------+
+ # |Michael|
+ # | Andy|
+ # | Justin|
+ # +-------+
+
+ # Select everybody, but increment the age by 1
+ df.select(df['name'], df['age'] + 1).show()
+ # +-------+---------+
+ # | name|(age + 1)|
+ # +-------+---------+
+ # |Michael| null|
+ # | Andy| 31|
+ # | Justin| 20|
+ # +-------+---------+
+
+ # Select people older than 21
+ df.filter(df['age'] > 21).show()
+ # +---+----+
+ # |age|name|
+ # +---+----+
+ # | 30|Andy|
+ # +---+----+
+
+ # Count people by age
+ df.groupBy("age").count().show()
+ # +----+-----+
+ # | age|count|
+ # +----+-----+
+ # | 19| 1|
+ # |null| 1|
+ # | 30| 1|
+ # +----+-----+
+ # $example off:untyped_ops$
+
+ # $example on:run_sql$
+ # Register the DataFrame as a SQL temporary view
+ df.createOrReplaceTempView("people")
+
+ sqlDF = spark.sql("SELECT * FROM people")
+ sqlDF.show()
+ # +----+-------+
+ # | age| name|
+ # +----+-------+
+ # |null|Michael|
+ # | 30| Andy|
+ # | 19| Justin|
+ # +----+-------+
+ # $example off:run_sql$
+
+
+def schema_inference_example(spark):
+ # $example on:schema_inferring$
+ sc = spark.sparkContext
+
+ # Load a text file and convert each line to a Row.
+ lines = sc.textFile("examples/src/main/resources/people.txt")
+ parts = lines.map(lambda l: l.split(","))
+ people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
+
+ # Infer the schema, and register the DataFrame as a table.
+ schemaPeople = spark.createDataFrame(people)
+ schemaPeople.createOrReplaceTempView("people")
+
+ # SQL can be run over DataFrames that have been registered as a table.
+ teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+ # The results of SQL queries are Dataframe objects.
+ # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
+ teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
+ for name in teenNames:
+ print(name)
+ # Name: Justin
+ # $example off:schema_inferring$
+
+
+def programmatic_schema_example(spark):
+ # $example on:programmatic_schema$
+ sc = spark.sparkContext
+
+ # Load a text file and convert each line to a Row.
+ lines = sc.textFile("examples/src/main/resources/people.txt")
+ parts = lines.map(lambda l: l.split(","))
+ # Each line is converted to a tuple.
+ people = parts.map(lambda p: (p[0], p[1].strip()))
+
+ # The schema is encoded in a string.
+ schemaString = "name age"
+
+ fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
+ schema = StructType(fields)
+
+ # Apply the schema to the RDD.
+ schemaPeople = spark.createDataFrame(people, schema)
+
+ # Creates a temporary view using the DataFrame
+ schemaPeople.createOrReplaceTempView("people")
+
+ # Creates a temporary view using the DataFrame
+ schemaPeople.createOrReplaceTempView("people")
+
+ # SQL can be run over DataFrames that have been registered as a table.
+ results = spark.sql("SELECT name FROM people")
+
+ results.show()
+ # +-------+
+ # | name|
+ # +-------+
+ # |Michael|
+ # | Andy|
+ # | Justin|
+ # +-------+
+ # $example off:programmatic_schema$
+
+if __name__ == "__main__":
+ # $example on:init_session$
+ spark = SparkSession \
+ .builder \
+ .appName("PythonSQL") \
+ .config("spark.some.config.option", "some-value") \
+ .getOrCreate()
+ # $example off:init_session$
+
+ basic_df_example(spark)
+ schema_inference_example(spark)
+ programmatic_schema_example(spark)
+
+ spark.stop()
diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py
new file mode 100644
index 0000000000..0bdc3d66ff
--- /dev/null
+++ b/examples/src/main/python/sql/datasource.py
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+from pyspark.sql import SparkSession
+# $example on:schema_merging$
+from pyspark.sql import Row
+# $example off:schema_merging$
+
+"""
+A simple example demonstrating Spark SQL data sources.
+Run with:
+ ./bin/spark-submit examples/src/main/python/sql/datasource.py
+"""
+
+
+def basic_datasource_example(spark):
+ # $example on:generic_load_save_functions$
+ df = spark.read.load("examples/src/main/resources/users.parquet")
+ df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
+ # $example off:generic_load_save_functions$
+
+ # $example on:manual_load_options$
+ df = spark.read.load("examples/src/main/resources/people.json", format="json")
+ df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
+ # $example off:manual_load_options$
+
+ # $example on:direct_sql$
+ df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
+ # $example off:direct_sql$
+
+
+def parquet_example(spark):
+ # $example on:basic_parquet_example$
+ peopleDF = spark.read.json("examples/src/main/resources/people.json")
+
+ # DataFrames can be saved as Parquet files, maintaining the schema information.
+ peopleDF.write.parquet("people.parquet")
+
+ # Read in the Parquet file created above.
+ # Parquet files are self-describing so the schema is preserved.
+ # The result of loading a parquet file is also a DataFrame.
+ parquetFile = spark.read.parquet("people.parquet")
+
+ # Parquet files can also be used to create a temporary view and then used in SQL statements.
+ parquetFile.createOrReplaceTempView("parquetFile")
+ teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
+ teenagers.show()
+ # +------+
+ # | name|
+ # +------+
+ # |Justin|
+ # +------+
+ # $example off:basic_parquet_example$
+
+
+def parquet_schema_merging_example(spark):
+ # $example on:schema_merging$
+ # spark is from the previous example.
+ # Create a simple DataFrame, stored into a partition directory
+ sc = spark.sparkContext
+
+ squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
+ .map(lambda i: Row(single=i, double=i ** 2)))
+ squaresDF.write.parquet("data/test_table/key=1")
+
+ # Create another DataFrame in a new partition directory,
+ # adding a new column and dropping an existing column
+ cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
+ .map(lambda i: Row(single=i, triple=i ** 3)))
+ cubesDF.write.parquet("data/test_table/key=2")
+
+ # Read the partitioned table
+ mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
+ mergedDF.printSchema()
+
+ # The final schema consists of all 3 columns in the Parquet files together
+ # with the partitioning column appeared in the partition directory paths.
+ # root
+ # |-- double: long (nullable = true)
+ # |-- single: long (nullable = true)
+ # |-- triple: long (nullable = true)
+ # |-- key: integer (nullable = true)
+ # $example off:schema_merging$
+
+
+def json_dataset_examplg(spark):
+ # $example on:json_dataset$
+ # spark is from the previous example.
+ sc = spark.sparkContext
+
+ # A JSON dataset is pointed to by path.
+ # The path can be either a single text file or a directory storing text files
+ path = "examples/src/main/resources/people.json"
+ peopleDF = spark.read.json(path)
+
+ # The inferred schema can be visualized using the printSchema() method
+ peopleDF.printSchema()
+ # root
+ # |-- age: long (nullable = true)
+ # |-- name: string (nullable = true)
+
+ # Creates a temporary view using the DataFrame
+ peopleDF.createOrReplaceTempView("people")
+
+ # SQL statements can be run by using the sql methods provided by spark
+ teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
+ teenagerNamesDF.show()
+ # +------+
+ # | name|
+ # +------+
+ # |Justin|
+ # +------+
+
+ # Alternatively, a DataFrame can be created for a JSON dataset represented by
+ # an RDD[String] storing one JSON object per string
+ jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
+ otherPeopleRDD = sc.parallelize(jsonStrings)
+ otherPeople = spark.read.json(otherPeopleRDD)
+ otherPeople.show()
+ # +---------------+----+
+ # | address|name|
+ # +---------------+----+
+ # |[Columbus,Ohio]| Yin|
+ # +---------------+----+
+ # $example off:json_dataset$
+
+if __name__ == "__main__":
+ spark = SparkSession \
+ .builder \
+ .appName("PythonSQL") \
+ .getOrCreate()
+
+ basic_datasource_example(spark)
+ parquet_example(spark)
+ parquet_schema_merging_example(spark)
+ json_dataset_examplg(spark)
+
+ spark.stop()
diff --git a/examples/src/main/python/sql/hive.py b/examples/src/main/python/sql/hive.py
new file mode 100644
index 0000000000..d9ce5cef1f
--- /dev/null
+++ b/examples/src/main/python/sql/hive.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+# $example on:spark_hive$
+from os.path import expanduser, join
+
+from pyspark.sql import SparkSession
+from pyspark.sql import Row
+# $example off:spark_hive$
+
+"""
+A simple example demonstrating Spark SQL Hive integration.
+Run with:
+ ./bin/spark-submit examples/src/main/python/sql/hive.py
+"""
+
+
+if __name__ == "__main__":
+ # $example on:spark_hive$
+ # warehouse_location points to the default location for managed databases and tables
+ warehouse_location = 'file:${system:user.dir}/spark-warehouse'
+
+ spark = SparkSession \
+ .builder \
+ .appName("PythonSQL") \
+ .config("spark.sql.warehouse.dir", warehouse_location) \
+ .enableHiveSupport() \
+ .getOrCreate()
+
+ # spark is an existing SparkSession
+ spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+ spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
+
+ # Queries are expressed in HiveQL
+ spark.sql("SELECT * FROM src").show()
+ # +---+-------+
+ # |key| value|
+ # +---+-------+
+ # |238|val_238|
+ # | 86| val_86|
+ # |311|val_311|
+ # ...
+
+ # Aggregation queries are also supported.
+ spark.sql("SELECT COUNT(*) FROM src").show()
+ # +--------+
+ # |count(1)|
+ # +--------+
+ # | 500 |
+ # +--------+
+
+ # The results of SQL queries are themselves DataFrames and support all normal functions.
+ sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
+
+ # The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
+ stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
+ for record in stringsDS.collect():
+ print(record)
+ # Key: 0, Value: val_0
+ # Key: 0, Value: val_0
+ # Key: 0, Value: val_0
+ # ...
+
+ # You can also use DataFrames to create temporary views within a SparkSession.
+ Record = Row("key", "value")
+ recordsDF = spark.createDataFrame(map(lambda i: Record(i, "val_" + str(i)), range(1, 101)))
+ recordsDF.createOrReplaceTempView("records")
+
+ # Queries can then join DataFrame data with data stored in Hive.
+ spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
+ # +---+------+---+------+
+ # |key| value|key| value|
+ # +---+------+---+------+
+ # | 2| val_2| 2| val_2|
+ # | 4| val_4| 4| val_4|
+ # | 5| val_5| 5| val_5|
+ # ...
+ # $example off:spark_hive$
+
+ spark.stop()
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
index e897c2d066..11e84c0e45 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
@@ -87,7 +87,7 @@ object SparkHiveExample {
// |Key: 0, Value: val_0|
// ...
- // You can also use DataFrames to create temporary views within a HiveContext.
+ // You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
@@ -97,8 +97,8 @@ object SparkHiveExample {
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
- // | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
+ // | 5| val_5| 5| val_5|
// ...
// $example off:spark_hive$