aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-07-23 11:41:24 -0700
committerReynold Xin <rxin@databricks.com>2016-07-23 11:41:24 -0700
commit53b2456d1de38b9d4f18509e7b36eb3fbe09e050 (patch)
tree7a783f09648b4c86ec04b9fd26e9ef6871f2d352 /examples/src/main/python/sql
parent86c275206605c44e1ebca2f166d62868e44bf029 (diff)
downloadspark-53b2456d1de38b9d4f18509e7b36eb3fbe09e050.tar.gz
spark-53b2456d1de38b9d4f18509e7b36eb3fbe09e050.tar.bz2
spark-53b2456d1de38b9d4f18509e7b36eb3fbe09e050.zip
[SPARK-16380][EXAMPLES] Update SQL examples and programming guide for Python language binding
This PR is based on PR #14098 authored by wangmiao1981. ## What changes were proposed in this pull request? This PR replaces the original Python Spark SQL example file with the following three files: - `sql/basic.py` Demonstrates basic Spark SQL features. - `sql/datasource.py` Demonstrates various Spark SQL data sources. - `sql/hive.py` Demonstrates Spark SQL Hive interaction. This PR also removes hard-coded Python example snippets in the SQL programming guide by extracting snippets from the above files using the `include_example` Liquid template tag. ## How was this patch tested? Manually tested. Author: wm624@hotmail.com <wm624@hotmail.com> Author: Cheng Lian <lian@databricks.com> Closes #14317 from liancheng/py-examples-update.
Diffstat (limited to 'examples/src/main/python/sql')
-rw-r--r--examples/src/main/python/sql/basic.py194
-rw-r--r--examples/src/main/python/sql/datasource.py154
-rw-r--r--examples/src/main/python/sql/hive.py96
3 files changed, 444 insertions, 0 deletions
diff --git a/examples/src/main/python/sql/basic.py b/examples/src/main/python/sql/basic.py
new file mode 100644
index 0000000000..74f5009581
--- /dev/null
+++ b/examples/src/main/python/sql/basic.py
@@ -0,0 +1,194 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+# $example on:init_session$
+from pyspark.sql import SparkSession
+# $example off:init_session$
+
+# $example on:schema_inferring$
+from pyspark.sql import Row
+# $example off:schema_inferring$
+
+# $example on:programmatic_schema$
+# Import data types
+from pyspark.sql.types import *
+# $example off:programmatic_schema$
+
+"""
+A simple example demonstrating basic Spark SQL features.
+Run with:
+ ./bin/spark-submit examples/src/main/python/sql/basic.py
+"""
+
+
+def basic_df_example(spark):
+ # $example on:create_df$
+ # spark is an existing SparkSession
+ df = spark.read.json("examples/src/main/resources/people.json")
+ # Displays the content of the DataFrame to stdout
+ df.show()
+ # +----+-------+
+ # | age| name|
+ # +----+-------+
+ # |null|Michael|
+ # | 30| Andy|
+ # | 19| Justin|
+ # +----+-------+
+ # $example off:create_df$
+
+ # $example on:untyped_ops$
+ # spark, df are from the previous example
+ # Print the schema in a tree format
+ df.printSchema()
+ # root
+ # |-- age: long (nullable = true)
+ # |-- name: string (nullable = true)
+
+ # Select only the "name" column
+ df.select("name").show()
+ # +-------+
+ # | name|
+ # +-------+
+ # |Michael|
+ # | Andy|
+ # | Justin|
+ # +-------+
+
+ # Select everybody, but increment the age by 1
+ df.select(df['name'], df['age'] + 1).show()
+ # +-------+---------+
+ # | name|(age + 1)|
+ # +-------+---------+
+ # |Michael| null|
+ # | Andy| 31|
+ # | Justin| 20|
+ # +-------+---------+
+
+ # Select people older than 21
+ df.filter(df['age'] > 21).show()
+ # +---+----+
+ # |age|name|
+ # +---+----+
+ # | 30|Andy|
+ # +---+----+
+
+ # Count people by age
+ df.groupBy("age").count().show()
+ # +----+-----+
+ # | age|count|
+ # +----+-----+
+ # | 19| 1|
+ # |null| 1|
+ # | 30| 1|
+ # +----+-----+
+ # $example off:untyped_ops$
+
+ # $example on:run_sql$
+ # Register the DataFrame as a SQL temporary view
+ df.createOrReplaceTempView("people")
+
+ sqlDF = spark.sql("SELECT * FROM people")
+ sqlDF.show()
+ # +----+-------+
+ # | age| name|
+ # +----+-------+
+ # |null|Michael|
+ # | 30| Andy|
+ # | 19| Justin|
+ # +----+-------+
+ # $example off:run_sql$
+
+
+def schema_inference_example(spark):
+ # $example on:schema_inferring$
+ sc = spark.sparkContext
+
+ # Load a text file and convert each line to a Row.
+ lines = sc.textFile("examples/src/main/resources/people.txt")
+ parts = lines.map(lambda l: l.split(","))
+ people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
+
+ # Infer the schema, and register the DataFrame as a table.
+ schemaPeople = spark.createDataFrame(people)
+ schemaPeople.createOrReplaceTempView("people")
+
+ # SQL can be run over DataFrames that have been registered as a table.
+ teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+ # The results of SQL queries are Dataframe objects.
+ # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
+ teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
+ for name in teenNames:
+ print(name)
+ # Name: Justin
+ # $example off:schema_inferring$
+
+
+def programmatic_schema_example(spark):
+ # $example on:programmatic_schema$
+ sc = spark.sparkContext
+
+ # Load a text file and convert each line to a Row.
+ lines = sc.textFile("examples/src/main/resources/people.txt")
+ parts = lines.map(lambda l: l.split(","))
+ # Each line is converted to a tuple.
+ people = parts.map(lambda p: (p[0], p[1].strip()))
+
+ # The schema is encoded in a string.
+ schemaString = "name age"
+
+ fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
+ schema = StructType(fields)
+
+ # Apply the schema to the RDD.
+ schemaPeople = spark.createDataFrame(people, schema)
+
+ # Creates a temporary view using the DataFrame
+ schemaPeople.createOrReplaceTempView("people")
+
+ # Creates a temporary view using the DataFrame
+ schemaPeople.createOrReplaceTempView("people")
+
+ # SQL can be run over DataFrames that have been registered as a table.
+ results = spark.sql("SELECT name FROM people")
+
+ results.show()
+ # +-------+
+ # | name|
+ # +-------+
+ # |Michael|
+ # | Andy|
+ # | Justin|
+ # +-------+
+ # $example off:programmatic_schema$
+
+if __name__ == "__main__":
+ # $example on:init_session$
+ spark = SparkSession \
+ .builder \
+ .appName("PythonSQL") \
+ .config("spark.some.config.option", "some-value") \
+ .getOrCreate()
+ # $example off:init_session$
+
+ basic_df_example(spark)
+ schema_inference_example(spark)
+ programmatic_schema_example(spark)
+
+ spark.stop()
diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py
new file mode 100644
index 0000000000..0bdc3d66ff
--- /dev/null
+++ b/examples/src/main/python/sql/datasource.py
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+from pyspark.sql import SparkSession
+# $example on:schema_merging$
+from pyspark.sql import Row
+# $example off:schema_merging$
+
+"""
+A simple example demonstrating Spark SQL data sources.
+Run with:
+ ./bin/spark-submit examples/src/main/python/sql/datasource.py
+"""
+
+
+def basic_datasource_example(spark):
+ # $example on:generic_load_save_functions$
+ df = spark.read.load("examples/src/main/resources/users.parquet")
+ df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
+ # $example off:generic_load_save_functions$
+
+ # $example on:manual_load_options$
+ df = spark.read.load("examples/src/main/resources/people.json", format="json")
+ df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
+ # $example off:manual_load_options$
+
+ # $example on:direct_sql$
+ df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
+ # $example off:direct_sql$
+
+
+def parquet_example(spark):
+ # $example on:basic_parquet_example$
+ peopleDF = spark.read.json("examples/src/main/resources/people.json")
+
+ # DataFrames can be saved as Parquet files, maintaining the schema information.
+ peopleDF.write.parquet("people.parquet")
+
+ # Read in the Parquet file created above.
+ # Parquet files are self-describing so the schema is preserved.
+ # The result of loading a parquet file is also a DataFrame.
+ parquetFile = spark.read.parquet("people.parquet")
+
+ # Parquet files can also be used to create a temporary view and then used in SQL statements.
+ parquetFile.createOrReplaceTempView("parquetFile")
+ teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
+ teenagers.show()
+ # +------+
+ # | name|
+ # +------+
+ # |Justin|
+ # +------+
+ # $example off:basic_parquet_example$
+
+
+def parquet_schema_merging_example(spark):
+ # $example on:schema_merging$
+ # spark is from the previous example.
+ # Create a simple DataFrame, stored into a partition directory
+ sc = spark.sparkContext
+
+ squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
+ .map(lambda i: Row(single=i, double=i ** 2)))
+ squaresDF.write.parquet("data/test_table/key=1")
+
+ # Create another DataFrame in a new partition directory,
+ # adding a new column and dropping an existing column
+ cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
+ .map(lambda i: Row(single=i, triple=i ** 3)))
+ cubesDF.write.parquet("data/test_table/key=2")
+
+ # Read the partitioned table
+ mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
+ mergedDF.printSchema()
+
+ # The final schema consists of all 3 columns in the Parquet files together
+ # with the partitioning column appeared in the partition directory paths.
+ # root
+ # |-- double: long (nullable = true)
+ # |-- single: long (nullable = true)
+ # |-- triple: long (nullable = true)
+ # |-- key: integer (nullable = true)
+ # $example off:schema_merging$
+
+
+def json_dataset_examplg(spark):
+ # $example on:json_dataset$
+ # spark is from the previous example.
+ sc = spark.sparkContext
+
+ # A JSON dataset is pointed to by path.
+ # The path can be either a single text file or a directory storing text files
+ path = "examples/src/main/resources/people.json"
+ peopleDF = spark.read.json(path)
+
+ # The inferred schema can be visualized using the printSchema() method
+ peopleDF.printSchema()
+ # root
+ # |-- age: long (nullable = true)
+ # |-- name: string (nullable = true)
+
+ # Creates a temporary view using the DataFrame
+ peopleDF.createOrReplaceTempView("people")
+
+ # SQL statements can be run by using the sql methods provided by spark
+ teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
+ teenagerNamesDF.show()
+ # +------+
+ # | name|
+ # +------+
+ # |Justin|
+ # +------+
+
+ # Alternatively, a DataFrame can be created for a JSON dataset represented by
+ # an RDD[String] storing one JSON object per string
+ jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
+ otherPeopleRDD = sc.parallelize(jsonStrings)
+ otherPeople = spark.read.json(otherPeopleRDD)
+ otherPeople.show()
+ # +---------------+----+
+ # | address|name|
+ # +---------------+----+
+ # |[Columbus,Ohio]| Yin|
+ # +---------------+----+
+ # $example off:json_dataset$
+
+if __name__ == "__main__":
+ spark = SparkSession \
+ .builder \
+ .appName("PythonSQL") \
+ .getOrCreate()
+
+ basic_datasource_example(spark)
+ parquet_example(spark)
+ parquet_schema_merging_example(spark)
+ json_dataset_examplg(spark)
+
+ spark.stop()
diff --git a/examples/src/main/python/sql/hive.py b/examples/src/main/python/sql/hive.py
new file mode 100644
index 0000000000..d9ce5cef1f
--- /dev/null
+++ b/examples/src/main/python/sql/hive.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+# $example on:spark_hive$
+from os.path import expanduser, join
+
+from pyspark.sql import SparkSession
+from pyspark.sql import Row
+# $example off:spark_hive$
+
+"""
+A simple example demonstrating Spark SQL Hive integration.
+Run with:
+ ./bin/spark-submit examples/src/main/python/sql/hive.py
+"""
+
+
+if __name__ == "__main__":
+ # $example on:spark_hive$
+ # warehouse_location points to the default location for managed databases and tables
+ warehouse_location = 'file:${system:user.dir}/spark-warehouse'
+
+ spark = SparkSession \
+ .builder \
+ .appName("PythonSQL") \
+ .config("spark.sql.warehouse.dir", warehouse_location) \
+ .enableHiveSupport() \
+ .getOrCreate()
+
+ # spark is an existing SparkSession
+ spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+ spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
+
+ # Queries are expressed in HiveQL
+ spark.sql("SELECT * FROM src").show()
+ # +---+-------+
+ # |key| value|
+ # +---+-------+
+ # |238|val_238|
+ # | 86| val_86|
+ # |311|val_311|
+ # ...
+
+ # Aggregation queries are also supported.
+ spark.sql("SELECT COUNT(*) FROM src").show()
+ # +--------+
+ # |count(1)|
+ # +--------+
+ # | 500 |
+ # +--------+
+
+ # The results of SQL queries are themselves DataFrames and support all normal functions.
+ sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
+
+ # The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
+ stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
+ for record in stringsDS.collect():
+ print(record)
+ # Key: 0, Value: val_0
+ # Key: 0, Value: val_0
+ # Key: 0, Value: val_0
+ # ...
+
+ # You can also use DataFrames to create temporary views within a SparkSession.
+ Record = Row("key", "value")
+ recordsDF = spark.createDataFrame(map(lambda i: Record(i, "val_" + str(i)), range(1, 101)))
+ recordsDF.createOrReplaceTempView("records")
+
+ # Queries can then join DataFrame data with data stored in Hive.
+ spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
+ # +---+------+---+------+
+ # |key| value|key| value|
+ # +---+------+---+------+
+ # | 2| val_2| 2| val_2|
+ # | 4| val_4| 4| val_4|
+ # | 5| val_5| 5| val_5|
+ # ...
+ # $example off:spark_hive$
+
+ spark.stop()