aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r--python/pyspark/sql/context.py227
1 files changed, 81 insertions, 146 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 80939a1f8a..c2d81ba804 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -40,9 +40,9 @@ __all__ = ["SQLContext", "HiveContext", "UDFRegistration"]
def _monkey_patch_RDD(sqlCtx):
def toDF(self, schema=None, sampleRatio=None):
"""
- Convert current :class:`RDD` into a :class:`DataFrame`
+ Converts current :class:`RDD` into a :class:`DataFrame`
- This is a shorthand for `sqlCtx.createDataFrame(rdd, schema, sampleRatio)`
+ This is a shorthand for ``sqlCtx.createDataFrame(rdd, schema, sampleRatio)``
:param schema: a StructType or list of names of columns
:param samplingRatio: the sample ratio of rows used for inferring
@@ -56,49 +56,23 @@ def _monkey_patch_RDD(sqlCtx):
RDD.toDF = toDF
-class UDFRegistration(object):
- """Wrapper for register UDF"""
-
- def __init__(self, sqlCtx):
- self.sqlCtx = sqlCtx
-
- def register(self, name, f, returnType=StringType()):
- """Registers a lambda function as a UDF so it can be used in SQL statements.
-
- In addition to a name and the function itself, the return type can be optionally specified.
- When the return type is not given it default to a string and conversion will automatically
- be done. For any other return type, the produced object must match the specified type.
-
- >>> sqlCtx.udf.register("stringLengthString", lambda x: len(x))
- >>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
- [Row(c0=u'4')]
-
- >>> from pyspark.sql.types import IntegerType
- >>> sqlCtx.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
- >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
- [Row(c0=4)]
- """
- return self.sqlCtx.registerFunction(name, f, returnType)
-
-
class SQLContext(object):
-
"""Main entry point for Spark SQL functionality.
- A SQLContext can be used create L{DataFrame}, register L{DataFrame} as
+ A SQLContext can be used create :class:`DataFrame`, register :class:`DataFrame` as
tables, execute SQL over tables, cache tables, and read parquet files.
- """
- def __init__(self, sparkContext, sqlContext=None):
- """Create a new SQLContext.
-
- It will add a method called `toDF` to :class:`RDD`, which could be
- used to convert an RDD into a DataFrame, it's a shorthand for
- :func:`SQLContext.createDataFrame`.
+ When created, :class:`SQLContext` adds a method called ``toDF`` to :class:`RDD`,
+ which could be used to convert an RDD into a DataFrame, it's a shorthand for
+ :func:`SQLContext.createDataFrame`.
- :param sparkContext: The SparkContext to wrap.
- :param sqlContext: An optional JVM Scala SQLContext. If set, we do not instatiate a new
+ :param sparkContext: The :class:`SparkContext` backing this SQLContext.
+ :param sqlContext: An optional JVM Scala SQLContext. If set, we do not instantiate a new
SQLContext in the JVM, instead we make all calls to this object.
+ """
+
+ def __init__(self, sparkContext, sqlContext=None):
+ """Creates a new SQLContext.
>>> from datetime import datetime
>>> sqlCtx = SQLContext(sc)
@@ -145,7 +119,7 @@ class SQLContext(object):
@property
def udf(self):
- """Wrapper for register Python function as UDF """
+ """Returns a :class:`UDFRegistration` for UDF registration."""
return UDFRegistration(self)
def registerFunction(self, name, f, returnType=StringType()):
@@ -155,6 +129,10 @@ class SQLContext(object):
When the return type is not given it default to a string and conversion will automatically
be done. For any other return type, the produced object must match the specified type.
+ :param name: name of the UDF
+ :param samplingRatio: lambda function
+ :param returnType: a :class:`DataType` object
+
>>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
[Row(c0=u'4')]
@@ -163,6 +141,11 @@ class SQLContext(object):
>>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
[Row(c0=4)]
+
+ >>> from pyspark.sql.types import IntegerType
+ >>> sqlCtx.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
+ >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
+ [Row(c0=4)]
"""
func = lambda _, it: imap(lambda x: f(*x), it)
ser = AutoBatchedSerializer(PickleSerializer())
@@ -203,30 +186,7 @@ class SQLContext(object):
return schema
def inferSchema(self, rdd, samplingRatio=None):
- """Infer and apply a schema to an RDD of L{Row}.
-
- ::note:
- Deprecated in 1.3, use :func:`createDataFrame` instead
-
- When samplingRatio is specified, the schema is inferred by looking
- at the types of each row in the sampled dataset. Otherwise, the
- first 100 rows of the RDD are inspected. Nested collections are
- supported, which can include array, dict, list, Row, tuple,
- namedtuple, or object.
-
- Each row could be L{pyspark.sql.Row} object or namedtuple or objects.
- Using top level dicts is deprecated, as dict is used to represent Maps.
-
- If a single column has multiple distinct inferred types, it may cause
- runtime exceptions.
-
- >>> rdd = sc.parallelize(
- ... [Row(field1=1, field2="row1"),
- ... Row(field1=2, field2="row2"),
- ... Row(field1=3, field2="row3")])
- >>> df = sqlCtx.inferSchema(rdd)
- >>> df.collect()[0]
- Row(field1=1, field2=u'row1')
+ """::note: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("inferSchema is deprecated, please use createDataFrame instead")
@@ -236,27 +196,7 @@ class SQLContext(object):
return self.createDataFrame(rdd, None, samplingRatio)
def applySchema(self, rdd, schema):
- """
- Applies the given schema to the given RDD of L{tuple} or L{list}.
-
- ::note:
- Deprecated in 1.3, use :func:`createDataFrame` instead
-
- These tuples or lists can contain complex nested structures like
- lists, maps or nested rows.
-
- The schema should be a StructType.
-
- It is important that the schema matches the types of the objects
- in each row or exceptions could be thrown at runtime.
-
- >>> from pyspark.sql.types import *
- >>> rdd2 = sc.parallelize([(1, "row1"), (2, "row2"), (3, "row3")])
- >>> schema = StructType([StructField("field1", IntegerType(), False),
- ... StructField("field2", StringType(), False)])
- >>> df = sqlCtx.applySchema(rdd2, schema)
- >>> df.collect()
- [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
+ """::note: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("applySchema is deprecated, please use createDataFrame instead")
@@ -270,25 +210,23 @@ class SQLContext(object):
def createDataFrame(self, data, schema=None, samplingRatio=None):
"""
- Create a DataFrame from an RDD of tuple/list, list or pandas.DataFrame.
+ Creates a :class:`DataFrame` from an :class:`RDD` of :class:`tuple`/:class:`list`,
+ list or :class:`pandas.DataFrame`.
- `schema` could be :class:`StructType` or a list of column names.
+ When ``schema`` is a list of column names, the type of each column
+ will be inferred from ``data``.
- When `schema` is a list of column names, the type of each column
- will be inferred from `rdd`.
+ When ``schema`` is ``None``, it will try to infer the schema (column names and types)
+ from ``data``, which should be an RDD of :class:`Row`,
+ or :class:`namedtuple`, or :class:`dict`.
- When `schema` is None, it will try to infer the column name and type
- from `rdd`, which should be an RDD of :class:`Row`, or namedtuple,
- or dict.
+ If schema inference is needed, ``samplingRatio`` is used to determined the ratio of
+ rows used for schema inference. The first row will be used if ``samplingRatio`` is ``None``.
- If referring needed, `samplingRatio` is used to determined how many
- rows will be used to do referring. The first row will be used if
- `samplingRatio` is None.
-
- :param data: an RDD of Row/tuple/list/dict, list, or pandas.DataFrame
- :param schema: a StructType or list of names of columns
+ :param data: an RDD of :class:`Row`/:class:`tuple`/:class:`list`/:class:`dict`,
+ :class:`list`, or :class:`pandas.DataFrame`.
+ :param schema: a :class:`StructType` or list of column names. default None.
:param samplingRatio: the sample ratio of rows used for inferring
- :return: a DataFrame
>>> l = [('Alice', 1)]
>>> sqlCtx.createDataFrame(l).collect()
@@ -373,22 +311,20 @@ class SQLContext(object):
df = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return DataFrame(df, self)
- def registerDataFrameAsTable(self, rdd, tableName):
- """Registers the given RDD as a temporary table in the catalog.
+ def registerDataFrameAsTable(self, df, tableName):
+ """Registers the given :class:`DataFrame` as a temporary table in the catalog.
- Temporary tables exist only during the lifetime of this instance of
- SQLContext.
+ Temporary tables exist only during the lifetime of this instance of :class:`SQLContext`.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
"""
- if (rdd.__class__ is DataFrame):
- df = rdd._jdf
- self._ssql_ctx.registerDataFrameAsTable(df, tableName)
+ if (df.__class__ is DataFrame):
+ self._ssql_ctx.registerDataFrameAsTable(df._jdf, tableName)
else:
raise ValueError("Can only register DataFrame as table")
def parquetFile(self, *paths):
- """Loads a Parquet file, returning the result as a L{DataFrame}.
+ """Loads a Parquet file, returning the result as a :class:`DataFrame`.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
@@ -406,15 +342,10 @@ class SQLContext(object):
return DataFrame(jdf, self)
def jsonFile(self, path, schema=None, samplingRatio=1.0):
- """
- Loads a text file storing one JSON object per line as a
- L{DataFrame}.
+ """Loads a text file storing one JSON object per line as a :class:`DataFrame`.
- If the schema is provided, applies the given schema to this
- JSON dataset.
-
- Otherwise, it samples the dataset with ratio `samplingRatio` to
- determine the schema.
+ If the schema is provided, applies the given schema to this JSON dataset.
+ Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
@@ -450,13 +381,10 @@ class SQLContext(object):
return DataFrame(df, self)
def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
- """Loads an RDD storing one JSON object per string as a L{DataFrame}.
-
- If the schema is provided, applies the given schema to this
- JSON dataset.
+ """Loads an RDD storing one JSON object per string as a :class:`DataFrame`.
- Otherwise, it samples the dataset with ratio `samplingRatio` to
- determine the schema.
+ If the schema is provided, applies the given schema to this JSON dataset.
+ Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
>>> df1 = sqlCtx.jsonRDD(json)
>>> df1.first()
@@ -475,7 +403,6 @@ class SQLContext(object):
>>> df3 = sqlCtx.jsonRDD(json, schema)
>>> df3.first()
Row(field2=u'row1', field3=Row(field5=None))
-
"""
def func(iterator):
@@ -496,11 +423,11 @@ class SQLContext(object):
return DataFrame(df, self)
def load(self, path=None, source=None, schema=None, **options):
- """Returns the dataset in a data source as a DataFrame.
+ """Returns the dataset in a data source as a :class:`DataFrame`.
- The data source is specified by the `source` and a set of `options`.
- If `source` is not specified, the default data source configured by
- spark.sql.sources.default will be used.
+ The data source is specified by the ``source`` and a set of ``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame.
"""
@@ -526,11 +453,11 @@ class SQLContext(object):
It returns the DataFrame associated with the external table.
- The data source is specified by the `source` and a set of `options`.
- If `source` is not specified, the default data source configured by
- spark.sql.sources.default will be used.
+ The data source is specified by the ``source`` and a set of ``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
- Optionally, a schema can be provided as the schema of the returned DataFrame and
+ Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
"""
if path is not None:
@@ -551,7 +478,7 @@ class SQLContext(object):
return DataFrame(df, self)
def sql(self, sqlQuery):
- """Return a L{DataFrame} representing the result of the given query.
+ """Returns a :class:`DataFrame` representing the result of the given query.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
@@ -561,7 +488,7 @@ class SQLContext(object):
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
def table(self, tableName):
- """Returns the specified table as a L{DataFrame}.
+ """Returns the specified table as a :class:`DataFrame`.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.table("table1")
@@ -571,12 +498,12 @@ class SQLContext(object):
return DataFrame(self._ssql_ctx.table(tableName), self)
def tables(self, dbName=None):
- """Returns a DataFrame containing names of tables in the given database.
+ """Returns a :class:`DataFrame` containing names of tables in the given database.
- If `dbName` is not specified, the current database will be used.
+ If ``dbName`` is not specified, the current database will be used.
- The returned DataFrame has two columns, tableName and isTemporary
- (a column with BooleanType indicating if a table is a temporary one or not).
+ The returned DataFrame has two columns: ``tableName`` and ``isTemporary``
+ (a column with :class:`BooleanType` indicating if a table is a temporary one or not).
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.tables()
@@ -589,9 +516,9 @@ class SQLContext(object):
return DataFrame(self._ssql_ctx.tables(dbName), self)
def tableNames(self, dbName=None):
- """Returns a list of names of tables in the database `dbName`.
+ """Returns a list of names of tables in the database ``dbName``.
- If `dbName` is not specified, the current database will be used.
+ If ``dbName`` is not specified, the current database will be used.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlCtx.tableNames()
@@ -618,22 +545,18 @@ class SQLContext(object):
class HiveContext(SQLContext):
-
"""A variant of Spark SQL that integrates with data stored in Hive.
- Configuration for Hive is read from hive-site.xml on the classpath.
+ Configuration for Hive is read from ``hive-site.xml`` on the classpath.
It supports running both SQL and HiveQL commands.
+
+ :param sparkContext: The SparkContext to wrap.
+ :param hiveContext: An optional JVM Scala HiveContext. If set, we do not instantiate a new
+ :class:`HiveContext` in the JVM, instead we make all calls to this object.
"""
def __init__(self, sparkContext, hiveContext=None):
- """Create a new HiveContext.
-
- :param sparkContext: The SparkContext to wrap.
- :param hiveContext: An optional JVM Scala HiveContext. If set, we do not instatiate a new
- HiveContext in the JVM, instead we make all calls to this object.
- """
SQLContext.__init__(self, sparkContext)
-
if hiveContext:
self._scala_HiveContext = hiveContext
@@ -652,6 +575,18 @@ class HiveContext(SQLContext):
return self._jvm.HiveContext(self._jsc.sc())
+class UDFRegistration(object):
+ """Wrapper for user-defined function registration."""
+
+ def __init__(self, sqlCtx):
+ self.sqlCtx = sqlCtx
+
+ def register(self, name, f, returnType=StringType()):
+ return self.sqlCtx.registerFunction(name, f, returnType)
+
+ register.__doc__ = SQLContext.registerFunction.__doc__
+
+
def _test():
import doctest
from pyspark.context import SparkContext