From b9cf617a6fa8812b45ff33acd109757a59f91dfa Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 11 May 2016 15:31:16 -0700 Subject: [SPARK-15256] [SQL] [PySpark] Clarify DataFrameReader.jdbc() docstring This PR: * Corrects the documentation for the `properties` parameter, which is supposed to be a dictionary and not a list. * Generally clarifies the Python docstring for DataFrameReader.jdbc() by pulling from the [Scala docstrings](https://github.com/apache/spark/blob/b28137764716f56fa1a923c4278624a56364a505/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L201-L251) and rephrasing things. * Corrects minor Sphinx typos. Author: Nicholas Chammas Closes #13034 from nchammas/SPARK-15256. --- python/pyspark/sql/readwriter.py | 67 +++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 32 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bd728c97c8..7fd7583972 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -402,10 +402,9 @@ class DataFrameReader(object): def orc(self, path): """Loads an ORC file, returning the result as a :class:`DataFrame`. - ::Note: Currently ORC support is only available together with - :class:`HiveContext`. + .. note:: Currently ORC support is only available together with Hive support. - >>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') + >>> df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> df.dtypes [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ @@ -415,28 +414,31 @@ class DataFrameReader(object): def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None): """ - Construct a :class:`DataFrame` representing the database table accessible - via JDBC URL `url` named `table` and connection `properties`. + Construct a :class:`DataFrame` representing the database table named ``table`` + accessible via JDBC URL ``url`` and connection ``properties``. - The `column` parameter could be used to partition the table, then it will - be retrieved in parallel based on the parameters passed to this function. + Partitions of the table will be retrieved in parallel if either ``column`` or + ``predicates`` is specified. - The `predicates` parameter gives a list expressions suitable for inclusion - in WHERE clauses; each one defines one partition of the :class:`DataFrame`. + If both ``column`` and ``predicates`` are specified, ``column`` will be used. - ::Note: Don't create too many partitions in parallel on a large cluster; + .. note:: Don't create too many partitions in parallel on a large cluster; \ otherwise Spark might crash your external database systems. - :param url: a JDBC URL - :param table: name of table - :param column: the column used to partition - :param lowerBound: the lower bound of partition column - :param upperBound: the upper bound of the partition column + :param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` + :param table: the name of the table + :param column: the name of an integer column that will be used for partitioning; + if this parameter is specified, then ``numPartitions``, ``lowerBound`` + (inclusive), and ``upperBound`` (exclusive) will form partition strides + for generated WHERE clause expressions used to split the column + ``column`` evenly + :param lowerBound: the minimum value of ``column`` used to decide partition stride + :param upperBound: the maximum value of ``column`` used to decide partition stride :param numPartitions: the number of partitions - :param predicates: a list of expressions - :param properties: JDBC database connection arguments, a list of arbitrary string - tag/value. Normally at least a "user" and "password" property - should be included. + :param predicates: a list of expressions suitable for inclusion in WHERE clauses; + each one defines one partition of the :class:`DataFrame` + :param properties: a dictionary of JDBC database connection arguments; normally, + at least a "user" and "password" property should be included :return: a DataFrame """ if properties is None: @@ -538,7 +540,7 @@ class DataFrameWriter(object): def queryName(self, queryName): """Specifies the name of the :class:`ContinuousQuery` that can be started with :func:`startStream`. This name must be unique among all the currently active queries - in the associated spark + in the associated SparkSession. .. note:: Experimental. @@ -808,8 +810,7 @@ class DataFrameWriter(object): def orc(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in ORC format at the specified path. - ::Note: Currently ORC support is only available together with - :class:`HiveContext`. + .. note:: Currently ORC support is only available together with Hive support. :param path: the path in any Hadoop supported file system :param mode: specifies the behavior of the save operation when data already exists. @@ -823,7 +824,7 @@ class DataFrameWriter(object): known case-insensitive shorten names (none, snappy, zlib, and lzo). This will overwrite ``orc.compress``. - >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') + >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) @@ -837,7 +838,7 @@ class DataFrameWriter(object): def jdbc(self, url, table, mode=None, properties=None): """Saves the content of the :class:`DataFrame` to a external database table via JDBC. - .. note:: Don't create too many partitions in parallel on a large cluster;\ + .. note:: Don't create too many partitions in parallel on a large cluster; \ otherwise Spark might crash your external database systems. :param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` @@ -864,30 +865,32 @@ def _test(): import doctest import os import tempfile + import py4j from pyspark.context import SparkContext - from pyspark.sql import SparkSession, Row, HiveContext + from pyspark.sql import SparkSession, Row import pyspark.sql.readwriter os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.readwriter.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') + try: + spark = SparkSession.withHiveSupport(sc) + except py4j.protocol.Py4JError: + spark = SparkSession(sc) globs['tempfile'] = tempfile globs['os'] = os globs['sc'] = sc - globs['spark'] = SparkSession.builder\ - .enableHiveSupport()\ - .getOrCreate() - globs['hiveContext'] = HiveContext._createForTesting(sc) - globs['df'] = globs['spark'].read.parquet('python/test_support/sql/parquet_partitioned') + globs['spark'] = spark + globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned') globs['sdf'] = \ - globs['spark'].read.format('text').stream('python/test_support/sql/streaming') + spark.read.format('text').stream('python/test_support/sql/streaming') (failure_count, test_count) = doctest.testmod( pyspark.sql.readwriter, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) - globs['sc'].stop() + sc.stop() if failure_count: exit(-1) -- cgit v1.2.3