aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/context.py
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-06-03 00:23:34 -0700
committerReynold Xin <rxin@databricks.com>2015-06-03 00:23:34 -0700
commitce320cb2dbf28825f80795ce569735888f98d6e8 (patch)
tree020c2f7b8a0868fb0cc3bef50a284c4f06b8777b /python/pyspark/sql/context.py
parent452eb82dd722e5dfd00ee47bb8b6353933b0016e (diff)
downloadspark-ce320cb2dbf28825f80795ce569735888f98d6e8.tar.gz
spark-ce320cb2dbf28825f80795ce569735888f98d6e8.tar.bz2
spark-ce320cb2dbf28825f80795ce569735888f98d6e8.zip
[SPARK-8060] Improve DataFrame Python test coverage and documentation.
Author: Reynold Xin <rxin@databricks.com> Closes #6601 from rxin/python-read-write-test-and-doc and squashes the following commits: baa8ad5 [Reynold Xin] Code review feedback. f081d47 [Reynold Xin] More documentation updates. c9902fa [Reynold Xin] [SPARK-8060] Improve DataFrame Python reader/writer interface doc and testing.
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r--python/pyspark/sql/context.py89
1 files changed, 37 insertions, 52 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 22f6257dfe..9fdf43c3e6 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -124,7 +124,10 @@ class SQLContext(object):
@property
@since("1.3.1")
def udf(self):
- """Returns a :class:`UDFRegistration` for UDF registration."""
+ """Returns a :class:`UDFRegistration` for UDF registration.
+
+ :return: :class:`UDFRegistration`
+ """
return UDFRegistration(self)
@since(1.4)
@@ -138,7 +141,7 @@ class SQLContext(object):
:param end: the end value (exclusive)
:param step: the incremental step (default: 1)
:param numPartitions: the number of partitions of the DataFrame
- :return: A new DataFrame
+ :return: :class:`DataFrame`
>>> sqlContext.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
@@ -195,8 +198,8 @@ class SQLContext(object):
raise ValueError("The first row in RDD is empty, "
"can not infer schema")
if type(first) is dict:
- warnings.warn("Using RDD of dict to inferSchema is deprecated,"
- "please use pyspark.sql.Row instead")
+ warnings.warn("Using RDD of dict to inferSchema is deprecated. "
+ "Use pyspark.sql.Row instead")
if samplingRatio is None:
schema = _infer_schema(first)
@@ -219,7 +222,7 @@ class SQLContext(object):
"""
.. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
- warnings.warn("inferSchema is deprecated, please use createDataFrame instead")
+ warnings.warn("inferSchema is deprecated, please use createDataFrame instead.")
if isinstance(rdd, DataFrame):
raise TypeError("Cannot apply schema to DataFrame")
@@ -262,6 +265,7 @@ class SQLContext(object):
:class:`list`, or :class:`pandas.DataFrame`.
:param schema: a :class:`StructType` or list of column names. default None.
:param samplingRatio: the sample ratio of rows used for inferring
+ :return: :class:`DataFrame`
>>> l = [('Alice', 1)]
>>> sqlContext.createDataFrame(l).collect()
@@ -359,18 +363,15 @@ class SQLContext(object):
else:
raise ValueError("Can only register DataFrame as table")
- @since(1.0)
def parquetFile(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
- >>> import tempfile, shutil
- >>> parquetFile = tempfile.mkdtemp()
- >>> shutil.rmtree(parquetFile)
- >>> df.saveAsParquetFile(parquetFile)
- >>> df2 = sqlContext.parquetFile(parquetFile)
- >>> sorted(df.collect()) == sorted(df2.collect())
- True
+ .. note:: Deprecated in 1.4, use :func:`DataFrameReader.parquet` instead.
+
+ >>> sqlContext.parquetFile('python/test_support/sql/parquet_partitioned').dtypes
+ [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
+ warnings.warn("parquetFile is deprecated. Use read.parquet() instead.")
gateway = self._sc._gateway
jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths))
for i in range(0, len(paths)):
@@ -378,39 +379,15 @@ class SQLContext(object):
jdf = self._ssql_ctx.parquetFile(jpaths)
return DataFrame(jdf, self)
- @since(1.0)
def jsonFile(self, path, schema=None, samplingRatio=1.0):
"""Loads a text file storing one JSON object per line as a :class:`DataFrame`.
- If the schema is provided, applies the given schema to this JSON dataset.
- Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
-
- >>> import tempfile, shutil
- >>> jsonFile = tempfile.mkdtemp()
- >>> shutil.rmtree(jsonFile)
- >>> with open(jsonFile, 'w') as f:
- ... f.writelines(jsonStrings)
- >>> df1 = sqlContext.jsonFile(jsonFile)
- >>> df1.printSchema()
- root
- |-- field1: long (nullable = true)
- |-- field2: string (nullable = true)
- |-- field3: struct (nullable = true)
- | |-- field4: long (nullable = true)
+ .. note:: Deprecated in 1.4, use :func:`DataFrameReader.json` instead.
- >>> from pyspark.sql.types import *
- >>> schema = StructType([
- ... StructField("field2", StringType()),
- ... StructField("field3",
- ... StructType([StructField("field5", ArrayType(IntegerType()))]))])
- >>> df2 = sqlContext.jsonFile(jsonFile, schema)
- >>> df2.printSchema()
- root
- |-- field2: string (nullable = true)
- |-- field3: struct (nullable = true)
- | |-- field5: array (nullable = true)
- | | |-- element: integer (containsNull = true)
+ >>> sqlContext.jsonFile('python/test_support/sql/people.json').dtypes
+ [('age', 'bigint'), ('name', 'string')]
"""
+ warnings.warn("jsonFile is deprecated. Use read.json() instead.")
if schema is None:
df = self._ssql_ctx.jsonFile(path, samplingRatio)
else:
@@ -462,21 +439,16 @@ class SQLContext(object):
df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
return DataFrame(df, self)
- @since(1.3)
def load(self, path=None, source=None, schema=None, **options):
"""Returns the dataset in a data source as a :class:`DataFrame`.
- The data source is specified by the ``source`` and a set of ``options``.
- If ``source`` is not specified, the default data source configured by
- ``spark.sql.sources.default`` will be used.
-
- Optionally, a schema can be provided as the schema of the returned DataFrame.
+ .. note:: Deprecated in 1.4, use :func:`DataFrameReader.load` instead.
"""
+ warnings.warn("load is deprecated. Use read.load() instead.")
return self.read.load(path, source, schema, **options)
@since(1.3)
- def createExternalTable(self, tableName, path=None, source=None,
- schema=None, **options):
+ def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates an external table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
@@ -487,6 +459,8 @@ class SQLContext(object):
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
+
+ :return: :class:`DataFrame`
"""
if path is not None:
options["path"] = path
@@ -508,6 +482,8 @@ class SQLContext(object):
def sql(self, sqlQuery):
"""Returns a :class:`DataFrame` representing the result of the given query.
+ :return: :class:`DataFrame`
+
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
@@ -519,6 +495,8 @@ class SQLContext(object):
def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`.
+ :return: :class:`DataFrame`
+
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
@@ -536,6 +514,9 @@ class SQLContext(object):
The returned DataFrame has two columns: ``tableName`` and ``isTemporary``
(a column with :class:`BooleanType` indicating if a table is a temporary one or not).
+ :param dbName: string, name of the database to use.
+ :return: :class:`DataFrame`
+
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = 'table1'").first()
@@ -550,7 +531,8 @@ class SQLContext(object):
def tableNames(self, dbName=None):
"""Returns a list of names of tables in the database ``dbName``.
- If ``dbName`` is not specified, the current database will be used.
+ :param dbName: string, name of the database to use. Default to the current database.
+ :return: list of table names, in string
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
@@ -585,8 +567,7 @@ class SQLContext(object):
Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`.
- >>> sqlContext.read
- <pyspark.sql.readwriter.DataFrameReader object at ...>
+ :return: :class:`DataFrameReader`
"""
return DataFrameReader(self)
@@ -644,10 +625,14 @@ class UDFRegistration(object):
def _test():
+ import os
import doctest
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
import pyspark.sql.context
+
+ os.chdir(os.environ["SPARK_HOME"])
+
globs = pyspark.sql.context.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc