aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/readwriter.py
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-06-03 00:23:34 -0700
committerReynold Xin <rxin@databricks.com>2015-06-03 00:23:34 -0700
commitce320cb2dbf28825f80795ce569735888f98d6e8 (patch)
tree020c2f7b8a0868fb0cc3bef50a284c4f06b8777b /python/pyspark/sql/readwriter.py
parent452eb82dd722e5dfd00ee47bb8b6353933b0016e (diff)
downloadspark-ce320cb2dbf28825f80795ce569735888f98d6e8.tar.gz
spark-ce320cb2dbf28825f80795ce569735888f98d6e8.tar.bz2
spark-ce320cb2dbf28825f80795ce569735888f98d6e8.zip
[SPARK-8060] Improve DataFrame Python test coverage and documentation.
Author: Reynold Xin <rxin@databricks.com> Closes #6601 from rxin/python-read-write-test-and-doc and squashes the following commits: baa8ad5 [Reynold Xin] Code review feedback. f081d47 [Reynold Xin] More documentation updates. c9902fa [Reynold Xin] [SPARK-8060] Improve DataFrame Python reader/writer interface doc and testing.
Diffstat (limited to 'python/pyspark/sql/readwriter.py')
-rw-r--r--python/pyspark/sql/readwriter.py217
1 files changed, 98 insertions, 119 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index d17d87419f..f036644acc 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -45,18 +45,24 @@ class DataFrameReader(object):
@since(1.4)
def format(self, source):
- """
- Specifies the input data source format.
+ """Specifies the input data source format.
+
+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+ >>> df = sqlContext.read.format('json').load('python/test_support/sql/people.json')
+ >>> df.dtypes
+ [('age', 'bigint'), ('name', 'string')]
+
"""
self._jreader = self._jreader.format(source)
return self
@since(1.4)
def schema(self, schema):
- """
- Specifies the input schema. Some data sources (e.g. JSON) can
- infer the input schema automatically from data. By specifying
- the schema here, the underlying data source can skip the schema
+ """Specifies the input schema.
+
+ Some data sources (e.g. JSON) can infer the input schema automatically from data.
+ By specifying the schema here, the underlying data source can skip the schema
inference step, and thus speed up data loading.
:param schema: a StructType object
@@ -69,8 +75,7 @@ class DataFrameReader(object):
@since(1.4)
def options(self, **options):
- """
- Adds input options for the underlying data source.
+ """Adds input options for the underlying data source.
"""
for k in options:
self._jreader = self._jreader.option(k, options[k])
@@ -84,6 +89,10 @@ class DataFrameReader(object):
:param format: optional string for format of the data source. Default to 'parquet'.
:param schema: optional :class:`StructType` for the input schema.
:param options: all other string options
+
+ >>> df = sqlContext.read.load('python/test_support/sql/parquet_partitioned')
+ >>> df.dtypes
+ [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
if format is not None:
self.format(format)
@@ -107,31 +116,10 @@ class DataFrameReader(object):
:param path: string, path to the JSON dataset.
:param schema: an optional :class:`StructType` for the input schema.
- >>> import tempfile, shutil
- >>> jsonFile = tempfile.mkdtemp()
- >>> shutil.rmtree(jsonFile)
- >>> with open(jsonFile, 'w') as f:
- ... f.writelines(jsonStrings)
- >>> df1 = sqlContext.read.json(jsonFile)
- >>> df1.printSchema()
- root
- |-- field1: long (nullable = true)
- |-- field2: string (nullable = true)
- |-- field3: struct (nullable = true)
- | |-- field4: long (nullable = true)
-
- >>> from pyspark.sql.types import *
- >>> schema = StructType([
- ... StructField("field2", StringType()),
- ... StructField("field3",
- ... StructType([StructField("field5", ArrayType(IntegerType()))]))])
- >>> df2 = sqlContext.read.json(jsonFile, schema)
- >>> df2.printSchema()
- root
- |-- field2: string (nullable = true)
- |-- field3: struct (nullable = true)
- | |-- field5: array (nullable = true)
- | | |-- element: integer (containsNull = true)
+ >>> df = sqlContext.read.json('python/test_support/sql/people.json')
+ >>> df.dtypes
+ [('age', 'bigint'), ('name', 'string')]
+
"""
if schema is not None:
self.schema(schema)
@@ -141,10 +129,12 @@ class DataFrameReader(object):
def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`.
- >>> sqlContext.registerDataFrameAsTable(df, "table1")
- >>> df2 = sqlContext.read.table("table1")
- >>> sorted(df.collect()) == sorted(df2.collect())
- True
+ :param tableName: string, name of the table.
+
+ >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
+ >>> df.registerTempTable('tmpTable')
+ >>> sqlContext.read.table('tmpTable').dtypes
+ [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
return self._df(self._jreader.table(tableName))
@@ -152,13 +142,9 @@ class DataFrameReader(object):
def parquet(self, *path):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
- >>> import tempfile, shutil
- >>> parquetFile = tempfile.mkdtemp()
- >>> shutil.rmtree(parquetFile)
- >>> df.saveAsParquetFile(parquetFile)
- >>> df2 = sqlContext.read.parquet(parquetFile)
- >>> sorted(df.collect()) == sorted(df2.collect())
- True
+ >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
+ >>> df.dtypes
+ [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
@@ -221,30 +207,34 @@ class DataFrameWriter(object):
@since(1.4)
def mode(self, saveMode):
- """
- Specifies the behavior when data or table already exists. Options include:
+ """Specifies the behavior when data or table already exists.
+
+ Options include:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
+
+ >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self._jwrite = self._jwrite.mode(saveMode)
return self
@since(1.4)
def format(self, source):
- """
- Specifies the underlying output data source. Built-in options include
- "parquet", "json", etc.
+ """Specifies the underlying output data source.
+
+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+ >>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self._jwrite = self._jwrite.format(source)
return self
@since(1.4)
def options(self, **options):
- """
- Adds output options for the underlying data source.
+ """Adds output options for the underlying data source.
"""
for k in options:
self._jwrite = self._jwrite.option(k, options[k])
@@ -252,12 +242,14 @@ class DataFrameWriter(object):
@since(1.4)
def partitionBy(self, *cols):
- """
- Partitions the output by the given columns on the file system.
+ """Partitions the output by the given columns on the file system.
+
If specified, the output is laid out on the file system similar
to Hive's partitioning scheme.
:param cols: name of columns
+
+ >>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
cols = cols[0]
@@ -266,25 +258,23 @@ class DataFrameWriter(object):
@since(1.4)
def save(self, path=None, format=None, mode="error", **options):
- """
- Saves the contents of the :class:`DataFrame` to a data source.
+ """Saves the contents of the :class:`DataFrame` to a data source.
The data source is specified by the ``format`` and a set of ``options``.
If ``format`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
- Additionally, mode is used to specify the behavior of the save operation when
- data already exists in the data source. There are four modes:
-
- * `append`: Append contents of this :class:`DataFrame` to existing data.
- * `overwrite`: Overwrite existing data.
- * `error`: Throw an exception if data already exists.
- * `ignore`: Silently ignore this operation if data already exists.
-
:param path: the path in a Hadoop supported file system
:param format: the format used to save
- :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ :param mode: specifies the behavior of the save operation when data already exists.
+
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
:param options: all other string options
+
+ >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode).options(**options)
if format is not None:
@@ -296,8 +286,8 @@ class DataFrameWriter(object):
@since(1.4)
def insertInto(self, tableName, overwrite=False):
- """
- Inserts the content of the :class:`DataFrame` to the specified table.
+ """Inserts the content of the :class:`DataFrame` to the specified table.
+
It requires that the schema of the class:`DataFrame` is the same as the
schema of the table.
@@ -307,8 +297,7 @@ class DataFrameWriter(object):
@since(1.4)
def saveAsTable(self, name, format=None, mode="error", **options):
- """
- Saves the content of the :class:`DataFrame` as the specified table.
+ """Saves the content of the :class:`DataFrame` as the specified table.
In the case the table already exists, behavior of this function depends on the
save mode, specified by the `mode` function (default to throwing an exception).
@@ -328,67 +317,58 @@ class DataFrameWriter(object):
self.mode(mode).options(**options)
if format is not None:
self.format(format)
- return self._jwrite.saveAsTable(name)
+ self._jwrite.saveAsTable(name)
@since(1.4)
def json(self, path, mode="error"):
- """
- Saves the content of the :class:`DataFrame` in JSON format at the
- specified path.
+ """Saves the content of the :class:`DataFrame` in JSON format at the specified path.
- Additionally, mode is used to specify the behavior of the save operation when
- data already exists in the data source. There are four modes:
+ :param path: the path in any Hadoop supported file system
+ :param mode: specifies the behavior of the save operation when data already exists.
- * `append`: Append contents of this :class:`DataFrame` to existing data.
- * `overwrite`: Overwrite existing data.
- * `error`: Throw an exception if data already exists.
- * `ignore`: Silently ignore this operation if data already exists.
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
- :param path: the path in any Hadoop supported file system
- :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
- return self._jwrite.mode(mode).json(path)
+ self._jwrite.mode(mode).json(path)
@since(1.4)
def parquet(self, path, mode="error"):
- """
- Saves the content of the :class:`DataFrame` in Parquet format at the
- specified path.
+ """Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
- Additionally, mode is used to specify the behavior of the save operation when
- data already exists in the data source. There are four modes:
+ :param path: the path in any Hadoop supported file system
+ :param mode: specifies the behavior of the save operation when data already exists.
- * `append`: Append contents of this :class:`DataFrame` to existing data.
- * `overwrite`: Overwrite existing data.
- * `error`: Throw an exception if data already exists.
- * `ignore`: Silently ignore this operation if data already exists.
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
- :param path: the path in any Hadoop supported file system
- :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
- return self._jwrite.mode(mode).parquet(path)
+ self._jwrite.mode(mode).parquet(path)
@since(1.4)
def jdbc(self, url, table, mode="error", properties={}):
- """
- Saves the content of the :class:`DataFrame` to a external database table
- via JDBC.
-
- In the case the table already exists in the external database,
- behavior of this function depends on the save mode, specified by the `mode`
- function (default to throwing an exception). There are four modes:
+ """Saves the content of the :class:`DataFrame` to a external database table via JDBC.
- * `append`: Append contents of this :class:`DataFrame` to existing data.
- * `overwrite`: Overwrite existing data.
- * `error`: Throw an exception if data already exists.
- * `ignore`: Silently ignore this operation if data already exists.
+ .. note:: Don't create too many partitions in parallel on a large cluster;\
+ otherwise Spark might crash your external database systems.
- :param url: a JDBC URL of the form `jdbc:subprotocol:subname`
+ :param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
:param table: Name of the table in the external database.
- :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ :param mode: specifies the behavior of the save operation when data already exists.
+
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
:param properties: JDBC database connection arguments, a list of
- arbitrary string tag/value. Normally at least a
- "user" and "password" property should be included.
+ arbitrary string tag/value. Normally at least a
+ "user" and "password" property should be included.
"""
jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
for k in properties:
@@ -398,24 +378,23 @@ class DataFrameWriter(object):
def _test():
import doctest
+ import os
+ import tempfile
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
import pyspark.sql.readwriter
+
+ os.chdir(os.environ["SPARK_HOME"])
+
globs = pyspark.sql.readwriter.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
+
+ globs['tempfile'] = tempfile
+ globs['os'] = os
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
- globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
- .toDF(StructType([StructField('age', IntegerType()),
- StructField('name', StringType())]))
- jsonStrings = [
- '{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
- '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
- '"field6":[{"field7": "row2"}]}',
- '{"field1" : null, "field2": "row3", '
- '"field3":{"field4":33, "field5": []}}'
- ]
- globs['jsonStrings'] = jsonStrings
+ globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
+
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)