aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-05-03 18:05:40 -0700
committerAndrew Or <andrew@databricks.com>2016-05-03 18:05:40 -0700
commit0903a185c7ebc57c75301a27d215b08efd347f99 (patch)
tree6ded1ba4dce606712f5d946c5c23bd50b40cfa9a /python
parentc1839c9911e37488230a68dec9041eb5958b6f1c (diff)
downloadspark-0903a185c7ebc57c75301a27d215b08efd347f99.tar.gz
spark-0903a185c7ebc57c75301a27d215b08efd347f99.tar.bz2
spark-0903a185c7ebc57c75301a27d215b08efd347f99.zip
[SPARK-15084][PYTHON][SQL] Use builder pattern to create SparkSession in PySpark.
## What changes were proposed in this pull request? This is a python port of corresponding Scala builder pattern code. `sql.py` is modified as a target example case. ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12860 from dongjoon-hyun/SPARK-15084.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/session.py91
1 files changed, 90 insertions, 1 deletions
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 35c36b4935..fb3e318163 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -19,6 +19,7 @@ from __future__ import print_function
import sys
import warnings
from functools import reduce
+from threading import RLock
if sys.version >= '3':
basestring = unicode = str
@@ -58,16 +59,98 @@ def _monkey_patch_RDD(sparkSession):
class SparkSession(object):
- """Main entry point for Spark SQL functionality.
+ """The entry point to programming Spark with the Dataset and DataFrame API.
A SparkSession can be used create :class:`DataFrame`, register :class:`DataFrame` as
tables, execute SQL over tables, cache tables, and read parquet files.
+ To create a SparkSession, use the following builder pattern:
+
+ >>> spark = SparkSession.builder \
+ .master("local") \
+ .appName("Word Count") \
+ .config("spark.some.config.option", "some-value") \
+ .getOrCreate()
:param sparkContext: The :class:`SparkContext` backing this SparkSession.
:param jsparkSession: An optional JVM Scala SparkSession. If set, we do not instantiate a new
SparkSession in the JVM, instead we make all calls to this object.
"""
+ class Builder(object):
+ """Builder for :class:`SparkSession`.
+ """
+
+ _lock = RLock()
+ _options = {}
+
+ @since(2.0)
+ def config(self, key=None, value=None, conf=None):
+ """Sets a config option. Options set using this method are automatically propagated to
+ both :class:`SparkConf` and :class:`SparkSession`'s own configuration.
+
+ For an existing SparkConf, use `conf` parameter.
+ >>> from pyspark.conf import SparkConf
+ >>> SparkSession.builder.config(conf=SparkConf())
+ <pyspark.sql.session...
+
+ For a (key, value) pair, you can omit parameter names.
+ >>> SparkSession.builder.config("spark.some.config.option", "some-value")
+ <pyspark.sql.session...
+
+ :param key: a key name string for configuration property
+ :param value: a value for configuration property
+ :param conf: an instance of :class:`SparkConf`
+ """
+ with self._lock:
+ if conf is None:
+ self._options[key] = str(value)
+ else:
+ for (k, v) in conf.getAll():
+ self._options[k] = v
+ return self
+
+ @since(2.0)
+ def master(self, master):
+ """Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]"
+ to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone
+ cluster.
+
+ :param master: a url for spark master
+ """
+ return self.config("spark.master", master)
+
+ @since(2.0)
+ def appName(self, name):
+ """Sets a name for the application, which will be shown in the Spark web UI.
+
+ :param name: an application name
+ """
+ return self.config("spark.app.name", name)
+
+ @since(2.0)
+ def enableHiveSupport(self):
+ """Enables Hive support, including connectivity to a persistent Hive metastore, support
+ for Hive serdes, and Hive user-defined functions.
+ """
+ return self.config("spark.sql.catalogImplementation", "hive")
+
+ @since(2.0)
+ def getOrCreate(self):
+ """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new
+ one based on the options set in this builder.
+ """
+ with self._lock:
+ from pyspark.conf import SparkConf
+ from pyspark.context import SparkContext
+ from pyspark.sql.context import SQLContext
+ sparkConf = SparkConf()
+ for key, value in self._options.items():
+ sparkConf.set(key, value)
+ sparkContext = SparkContext.getOrCreate(sparkConf)
+ return SQLContext.getOrCreate(sparkContext).sparkSession
+
+ builder = Builder()
+
_instantiatedContext = None
@ignore_unicode_prefix
@@ -445,6 +528,12 @@ class SparkSession(object):
"""
return DataFrameReader(self._wrapped)
+ @since(2.0)
+ def stop(self):
+ """Stop the underlying :class:`SparkContext`.
+ """
+ self._sc.stop()
+
def _test():
import os