aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/sql/hive.py
blob: ad83fe1cf14b5e7d4ee2ff3567e82194af1f1939 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

# $example on:spark_hive$
from os.path import expanduser, join

from pyspark.sql import SparkSession
from pyspark.sql import Row
# $example off:spark_hive$

"""
A simple example demonstrating Spark SQL Hive integration.
Run with:
  ./bin/spark-submit examples/src/main/python/sql/hive.py
"""


if __name__ == "__main__":
    # $example on:spark_hive$
    # warehouse_location points to the default location for managed databases and tables
    warehouse_location = 'spark-warehouse'

    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL Hive integration example") \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .enableHiveSupport() \
        .getOrCreate()

    # spark is an existing SparkSession
    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

    # Queries are expressed in HiveQL
    spark.sql("SELECT * FROM src").show()
    # +---+-------+
    # |key|  value|
    # +---+-------+
    # |238|val_238|
    # | 86| val_86|
    # |311|val_311|
    # ...

    # Aggregation queries are also supported.
    spark.sql("SELECT COUNT(*) FROM src").show()
    # +--------+
    # |count(1)|
    # +--------+
    # |    500 |
    # +--------+

    # The results of SQL queries are themselves DataFrames and support all normal functions.
    sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

    # The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
    stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
    for record in stringsDS.collect():
        print(record)
    # Key: 0, Value: val_0
    # Key: 0, Value: val_0
    # Key: 0, Value: val_0
    # ...

    # You can also use DataFrames to create temporary views within a SparkSession.
    Record = Row("key", "value")
    recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
    recordsDF.createOrReplaceTempView("records")

    # Queries can then join DataFrame data with data stored in Hive.
    spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    # +---+------+---+------+
    # |key| value|key| value|
    # +---+------+---+------+
    # |  2| val_2|  2| val_2|
    # |  4| val_4|  4| val_4|
    # |  5| val_5|  5| val_5|
    # ...
    # $example off:spark_hive$

    spark.stop()