diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-05-03 18:05:40 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-03 18:05:40 -0700 |
commit | 0903a185c7ebc57c75301a27d215b08efd347f99 (patch) | |
tree | 6ded1ba4dce606712f5d946c5c23bd50b40cfa9a /python | |
parent | c1839c9911e37488230a68dec9041eb5958b6f1c (diff) | |
download | spark-0903a185c7ebc57c75301a27d215b08efd347f99.tar.gz spark-0903a185c7ebc57c75301a27d215b08efd347f99.tar.bz2 spark-0903a185c7ebc57c75301a27d215b08efd347f99.zip |
[SPARK-15084][PYTHON][SQL] Use builder pattern to create SparkSession in PySpark.
## What changes were proposed in this pull request?
This is a python port of corresponding Scala builder pattern code. `sql.py` is modified as a target example case.
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #12860 from dongjoon-hyun/SPARK-15084.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql/session.py | 91 |
1 files changed, 90 insertions, 1 deletions
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 35c36b4935..fb3e318163 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -19,6 +19,7 @@ from __future__ import print_function import sys import warnings from functools import reduce +from threading import RLock if sys.version >= '3': basestring = unicode = str @@ -58,16 +59,98 @@ def _monkey_patch_RDD(sparkSession): class SparkSession(object): - """Main entry point for Spark SQL functionality. + """The entry point to programming Spark with the Dataset and DataFrame API. A SparkSession can be used create :class:`DataFrame`, register :class:`DataFrame` as tables, execute SQL over tables, cache tables, and read parquet files. + To create a SparkSession, use the following builder pattern: + + >>> spark = SparkSession.builder \ + .master("local") \ + .appName("Word Count") \ + .config("spark.some.config.option", "some-value") \ + .getOrCreate() :param sparkContext: The :class:`SparkContext` backing this SparkSession. :param jsparkSession: An optional JVM Scala SparkSession. If set, we do not instantiate a new SparkSession in the JVM, instead we make all calls to this object. """ + class Builder(object): + """Builder for :class:`SparkSession`. + """ + + _lock = RLock() + _options = {} + + @since(2.0) + def config(self, key=None, value=None, conf=None): + """Sets a config option. Options set using this method are automatically propagated to + both :class:`SparkConf` and :class:`SparkSession`'s own configuration. + + For an existing SparkConf, use `conf` parameter. + >>> from pyspark.conf import SparkConf + >>> SparkSession.builder.config(conf=SparkConf()) + <pyspark.sql.session... + + For a (key, value) pair, you can omit parameter names. + >>> SparkSession.builder.config("spark.some.config.option", "some-value") + <pyspark.sql.session... + + :param key: a key name string for configuration property + :param value: a value for configuration property + :param conf: an instance of :class:`SparkConf` + """ + with self._lock: + if conf is None: + self._options[key] = str(value) + else: + for (k, v) in conf.getAll(): + self._options[k] = v + return self + + @since(2.0) + def master(self, master): + """Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" + to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone + cluster. + + :param master: a url for spark master + """ + return self.config("spark.master", master) + + @since(2.0) + def appName(self, name): + """Sets a name for the application, which will be shown in the Spark web UI. + + :param name: an application name + """ + return self.config("spark.app.name", name) + + @since(2.0) + def enableHiveSupport(self): + """Enables Hive support, including connectivity to a persistent Hive metastore, support + for Hive serdes, and Hive user-defined functions. + """ + return self.config("spark.sql.catalogImplementation", "hive") + + @since(2.0) + def getOrCreate(self): + """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new + one based on the options set in this builder. + """ + with self._lock: + from pyspark.conf import SparkConf + from pyspark.context import SparkContext + from pyspark.sql.context import SQLContext + sparkConf = SparkConf() + for key, value in self._options.items(): + sparkConf.set(key, value) + sparkContext = SparkContext.getOrCreate(sparkConf) + return SQLContext.getOrCreate(sparkContext).sparkSession + + builder = Builder() + _instantiatedContext = None @ignore_unicode_prefix @@ -445,6 +528,12 @@ class SparkSession(object): """ return DataFrameReader(self._wrapped) + @since(2.0) + def stop(self): + """Stop the underlying :class:`SparkContext`. + """ + self._sc.stop() + def _test(): import os |