aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-03-31 18:31:36 -0700
committerReynold Xin <rxin@databricks.com>2015-03-31 18:31:43 -0700
commite527b359037ecc7206b70d2c51f6926986c69f2c (patch)
tree3e36691d4598bcae46b7a8728e7fb55fbbf6d07b
parentc4c982a65041ce13a55c1f2bd59c9a85cf3edfc5 (diff)
downloadspark-e527b359037ecc7206b70d2c51f6926986c69f2c.tar.gz
spark-e527b359037ecc7206b70d2c51f6926986c69f2c.tar.bz2
spark-e527b359037ecc7206b70d2c51f6926986c69f2c.zip
[Doc] Improve Python DataFrame documentation
Author: Reynold Xin <rxin@databricks.com> Closes #5287 from rxin/pyspark-df-doc-cleanup-context and squashes the following commits: 1841b60 [Reynold Xin] Lint. f2007f1 [Reynold Xin] functions and types. bc3b72b [Reynold Xin] More improvements to DataFrame Python doc. ac1d4c0 [Reynold Xin] Bug fix. b163365 [Reynold Xin] Python fix. Added Experimental flag to DataFrameNaFunctions. 608422d [Reynold Xin] [Doc] Cleanup context.py Python docs. (cherry picked from commit 305abe1e57450f49e3ec4dffb073c5adf17cadef) Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r--python/pyspark/sql/__init__.py4
-rw-r--r--python/pyspark/sql/context.py227
-rw-r--r--python/pyspark/sql/dataframe.py249
-rw-r--r--python/pyspark/sql/functions.py6
-rw-r--r--python/pyspark/sql/types.py154
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala3
6 files changed, 253 insertions, 390 deletions
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 9d39e5d9c2..65abb24eed 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -16,7 +16,7 @@
#
"""
-public classes of Spark SQL:
+Important classes of Spark SQL and DataFrames:
- L{SQLContext}
Main entry point for :class:`DataFrame` and SQL functionality.
@@ -34,6 +34,8 @@ public classes of Spark SQL:
Methods for handling missing data (null values).
- L{functions}
List of built-in functions available for :class:`DataFrame`.
+ - L{types}
+ List of data types available.
"""
from pyspark.sql.context import SQLContext, HiveContext
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 80939a1f8a..c2d81ba804 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -40,9 +40,9 @@ __all__ = ["SQLContext", "HiveContext", "UDFRegistration"]
def _monkey_patch_RDD(sqlCtx):
def toDF(self, schema=None, sampleRatio=None):
"""
- Convert current :class:`RDD` into a :class:`DataFrame`
+ Converts current :class:`RDD` into a :class:`DataFrame`
- This is a shorthand for `sqlCtx.createDataFrame(rdd, schema, sampleRatio)`
+ This is a shorthand for ``sqlCtx.createDataFrame(rdd, schema, sampleRatio)``
:param schema: a StructType or list of names of columns
:param samplingRatio: the sample ratio of rows used for inferring
@@ -56,49 +56,23 @@ def _monkey_patch_RDD(sqlCtx):
RDD.toDF = toDF
-class UDFRegistration(object):
- """Wrapper for register UDF"""
-
- def __init__(self, sqlCtx):
- self.sqlCtx = sqlCtx
-
- def register(self, name, f, returnType=StringType()):
- """Registers a lambda function as a UDF so it can be used in SQL statements.
-
- In addition to a name and the function itself, the return type can be optionally specified.
- When the return type is not given it default to a string and conversion will automatically
- be done. For any other return type, the produced object must match the specified type.
-
- >>> sqlCtx.udf.register("stringLengthString", lambda x: len(x))
- >>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
- [Row(c0=u'4')]
-
- >>> from pyspark.sql.types import IntegerType
- >>> sqlCtx.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
- >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
- [Row(c0=4)]
- """
- return self.sqlCtx.registerFunction(name, f, returnType)
-
-
class SQLContext(object):
-
"""Main entry point for Spark SQL functionality.
- A SQLContext can be used create L{DataFrame}, register L{DataFrame} as
+ A SQLContext can be used create :class:`DataFrame`, register :class:`DataFrame` as
tables, execute SQL over tables, cache tables, and read parquet files.
- """
- def __init__(self, sparkContext, sqlContext=None):
- """Create a new SQLContext.
-
- It will add a method called `toDF` to :class:`RDD`, which could be
- used to convert an RDD into a DataFrame, it's a shorthand for
- :func:`SQLContext.createDataFrame`.
+ When created, :class:`SQLContext` adds a method called ``toDF`` to :class:`RDD`,
+ which could be used to convert an RDD into a DataFrame, it's a shorthand for
+ :func:`SQLContext.createDataFrame`.
- :param sparkContext: The SparkContext to wrap.
- :param sqlContext: An optional JVM Scala SQLContext. If set, we do not instatiate a new
+ :param sparkContext: The :class:`SparkContext` backing this SQLContext.
+ :param sqlContext: An optional JVM Scala SQLContext. If set, we do not instantiate a new
SQLContext in the JVM, instead we make all calls to this object.
+ """
+
+ def __init__(self, sparkContext, sqlContext=None):
+ """Creates a new SQLContext.
>>> from datetime import datetime
>>> sqlCtx = SQLContext(sc)
@@ -145,7 +119,7 @@ class SQLContext(object):
@property
def udf(self):
- """Wrapper for register Python function as UDF """
+ """Returns a :class:`UDFRegistration` for UDF registration."""
return UDFRegistration(self)
def registerFunction(self, name, f, returnType=StringType()):
@@ -155,6 +129,10 @@ class SQLContext(object):
When the return type is not given it default to a string and conversion will automatically
be done. For any other return type, the produced object must match the specified type.
+ :param name: name of the UDF
+ :param samplingRatio: lambda function
+ :param returnType: a :class:`DataType` object
+
>>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
[Row(c0=u'4')]
@@ -163,6 +141,11 @@ class SQLContext(object):
>>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
[Row(c0=4)]
+
+ >>> from pyspark.sql.types import IntegerType
+ >>> sqlCtx.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
+ >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
+ [Row(c0=4)]
"""
func = lambda _, it: imap(lambda x: f(*x), it)
ser = AutoBatchedSerializer(PickleSerializer())
@@ -203,30 +186,7 @@ class SQLContext(object):
return schema
def inferSchema(self, rdd, samplingRatio=None):
- """Infer and apply a schema to an RDD of L{Row}.
-
- ::note:
- Deprecated in 1.3, use :func:`createDataFrame` instead
-
- When samplingRatio is specified, the schema is inferred by looking
- at the types of each row in the sampled dataset. Otherwise, the
- first 100 rows of the RDD are inspected. Nested collections are
- supported, which can include array, dict, list, Row, tuple,
- namedtuple, or object.
-
- Each row could be L{pyspark.sql.Row} object or namedtuple or objects.
- Using top level dicts is deprecated, as dict is used to represent Maps.
-
- If a single column has multiple distinct inferred types, it may cause
- runtime exceptions.
-
- >>> rdd = sc.parallelize(
- ... [Row(field1=1, field2="row1"),
- ... Row(field1=2, field2="row2"),
- ... Row(field1=3, field2="row3")])
- >>> df = sqlCtx.inferSchema(rdd)
- >>> df.collect()[0]
- Row(field1=1, field2=u'row1')
+ """::note: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("inferSchema is deprecated, please use createDataFrame instead")
@@ -236,27 +196,7 @@ class SQLContext(object):
return self.createDataFrame(rdd, None, samplingRatio)
def applySchema(self, rdd, schema):
- """
- Applies the given schema to the given RDD of L{tuple} or L{list}.
-
- ::note:
- Deprecated in 1.3, use :func:`createDataFrame` instead
-
- These tuples or lists can contain complex nested structures like
- lists, maps or nested rows.
-
- The schema should be a StructType.
-
- It is important that the schema matches the types of the objects
- in each row or exceptions could be thrown at runtime.
-
- >>> from pyspark.sql.types import *
- >>> rdd2 = sc.parallelize([(1, "row1"), (2, "row2"), (3, "row3")])
- >>> schema = StructType([StructField("field1", IntegerType(), False),
- ... StructField("field2", StringType(), False)])
- >>> df = sqlCtx.applySchema(rdd2, schema)
- >>> df.collect()
- [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
+ """::note: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("applySchema is deprecated, please use createDataFrame instead")
@@ -270,25 +210,23 @@ class SQLContext(object):
def createDataFrame(self, data, schema=None, samplingRatio=None):
"""
- Create a DataFrame from an RDD of tuple/list, list or pandas.DataFrame.
+ Creates a :class:`DataFrame` from an :class:`RDD` of :class:`tuple`/:class:`list`,
+ list or :class:`pandas.DataFrame`.
- `schema` could be :class:`StructType` or a list of column names.
+ When ``schema`` is a list of column names, the type of each column
+ will be inferred from ``data``.
- When `schema` is a list of column names, the type of each column
- will be inferred from `rdd`.
+ When ``schema`` is ``None``, it will try to infer the schema (column names and types)
+ from ``data``, which should be an RDD of :class:`Row`,
+ or :class:`namedtuple`, or :class:`dict`.
- When `schema` is None, it will try to infer the column name and type
- from `rdd`, which should be an RDD of :class:`Row`, or namedtuple,
- or dict.
+ If schema inference is needed, ``samplingRatio`` is used to determined the ratio of
+ rows used for schema inference. The first row will be used if ``samplingRatio`` is ``None``.
- If referring needed, `samplingRatio` is used to determined how many
- rows will be used to do referring. The first row will be used if
- `samplingRatio` is None.
-
- :param data: an RDD of Row/tuple/list/dict, list, or pandas.DataFrame
- :param schema: a StructType or list of names of columns
+ :param data: an RDD of :class:`Row`/:class:`tuple`/:class:`list`/:class:`dict`,
+ :class:`list`, or :class:`pandas.DataFrame`.
+ :param schema: a :class:`StructType` or list of column names. default None.
:param samplingRatio: the sample ratio of rows used for inferring
- :return: a DataFrame
>>> l = [('Alice', 1)]
>>> sqlCtx.createDataFrame(l).collect()
@@ -373,22 +311,20 @@ class SQLContext(object):
df = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return DataFrame(df, self)
- def registerDataFrameAsTable(self, rdd, tableName):
- """Registers the given RDD as a temporary table in the catalog.
+ def registerDataFrameAsTable(self, df, tableName):
+ """Registers the given :class:`DataFrame` as a temporary table in the catalog.
- Temporary tables exist only during the lifetime of this instance of
- SQLContext.
+ Temporary tables exist only during the lifetime of this instance of :class:`SQLContext`.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
"""
- if (rdd.__class__ is DataFrame):
- df = rdd._jdf
- self._ssql_ctx.registerDataFrameAsTable(df, tableName)
+ if (df.__class__ is DataFrame):
+ self._ssql_ctx.registerDataFrameAsTable(df._jdf, tableName)
else:
raise ValueError("Can only register DataFrame as table")
def parquetFile(self, *paths):
- """Loads a Parquet file, returning the result as a L{DataFrame}.
+ """Loads a Parquet file, returning the result as a :class:`DataFrame`.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
@@ -406,15 +342,10 @@ class SQLContext(object):
return DataFrame(jdf, self)
def jsonFile(self, path, schema=None, samplingRatio=1.0):
- """
- Loads a text file storing one JSON object per line as a
- L{DataFrame}.
+ """Loads a text file storing one JSON object per line as a :class:`DataFrame`.
- If the schema is provided, applies the given schema to this
- JSON dataset.
-
- Otherwise, it samples the dataset with ratio `samplingRatio` to
- determine the schema.
+ If the schema is provided, applies the given schema to this JSON dataset.
+ Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
@@ -450,13 +381,10 @@ class SQLContext(object):
return DataFrame(df, self)
def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
- """Loads an RDD storing one JSON object per string as a L{DataFrame}.
-
- If the schema is provided, applies the given schema to this
- JSON dataset.
+ """Loads an RDD storing one JSON object per string as a :class:`DataFrame`.
- Otherwise, it samples the dataset with ratio `samplingRatio` to
- determine the schema.
+ If the schema is provided, applies the given schema to this JSON dataset.
+ Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
>>> df1 = sqlCtx.jsonRDD(json)
>>> df1.first()
@@ -475,7 +403,6 @@ class SQLContext(object):
>>> df3 = sqlCtx.jsonRDD(json, schema)
>>> df3.first()
Row(field2=u'row1', field3=Row(field5=None))
-
"""
def func(iterator):
@@ -496,11 +423,11 @@ class SQLContext(object):
return DataFrame(df, self)
def load(self, path=None, source=None, schema=None, **options):
- """Returns the dataset in a data source as a DataFrame.
+ """Returns the dataset in a data source as a :class:`DataFrame`.
- The data source is specified by the `source` and a set of `options`.
- If `source` is not specified, the default data source configured by
- spark.sql.sources.default will be used.
+ The data source is specified by the ``source`` and a set of ``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame.
"""
@@ -526,11 +453,11 @@ class SQLContext(object):
It returns the DataFrame associated with the external table.
- The data source is specified by the `source` and a set of `options`.
- If `source` is not specified, the default data source configured by
- spark.sql.sources.default will be used.
+ The data source is specified by the ``source`` and a set of ``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
- Optionally, a schema can be provided as the schema of the returned DataFrame and
+ Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
"""
if path is not None:
@@ -551,7 +478,7 @@ class SQLContext(object):
return DataFrame(df, self)
def sql(self, sqlQuery):
- """Return a L{DataFrame} representing the result of the given query.
+ """Returns a :class:`DataFrame` representing the result of the given query.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
@@ -561,7 +488,7 @@ class SQLContext(object):
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
def table(self, tableName):
- """Returns the specified table as a L{DataFrame}.
+ """Returns the specified table as a :class:`DataFrame`.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.table("table1")
@@ -571,12 +498,12 @@ class SQLContext(object):
return DataFrame(self._ssql_ctx.table(tableName), self)
def tables(self, dbName=None):
- """Returns a DataFrame containing names of tables in the given database.
+ """Returns a :class:`DataFrame` containing names of tables in the given database.
- If `dbName` is not specified, the current database will be used.
+ If ``dbName`` is not specified, the current database will be used.
- The returned DataFrame has two columns, tableName and isTemporary
- (a column with BooleanType indicating if a table is a temporary one or not).
+ The returned DataFrame has two columns: ``tableName`` and ``isTemporary``
+ (a column with :class:`BooleanType` indicating if a table is a temporary one or not).
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.tables()
@@ -589,9 +516,9 @@ class SQLContext(object):
return DataFrame(self._ssql_ctx.tables(dbName), self)
def tableNames(self, dbName=None):
- """Returns a list of names of tables in the database `dbName`.
+ """Returns a list of names of tables in the database ``dbName``.
- If `dbName` is not specified, the current database will be used.
+ If ``dbName`` is not specified, the current database will be used.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlCtx.tableNames()
@@ -618,22 +545,18 @@ class SQLContext(object):
class HiveContext(SQLContext):
-
"""A variant of Spark SQL that integrates with data stored in Hive.
- Configuration for Hive is read from hive-site.xml on the classpath.
+ Configuration for Hive is read from ``hive-site.xml`` on the classpath.
It supports running both SQL and HiveQL commands.
+
+ :param sparkContext: The SparkContext to wrap.
+ :param hiveContext: An optional JVM Scala HiveContext. If set, we do not instantiate a new
+ :class:`HiveContext` in the JVM, instead we make all calls to this object.
"""
def __init__(self, sparkContext, hiveContext=None):
- """Create a new HiveContext.
-
- :param sparkContext: The SparkContext to wrap.
- :param hiveContext: An optional JVM Scala HiveContext. If set, we do not instatiate a new
- HiveContext in the JVM, instead we make all calls to this object.
- """
SQLContext.__init__(self, sparkContext)
-
if hiveContext:
self._scala_HiveContext = hiveContext
@@ -652,6 +575,18 @@ class HiveContext(SQLContext):
return self._jvm.HiveContext(self._jsc.sc())
+class UDFRegistration(object):
+ """Wrapper for user-defined function registration."""
+
+ def __init__(self, sqlCtx):
+ self.sqlCtx = sqlCtx
+
+ def register(self, name, f, returnType=StringType()):
+ return self.sqlCtx.registerFunction(name, f, returnType)
+
+ register.__doc__ = SQLContext.registerFunction.__doc__
+
+
def _test():
import doctest
from pyspark.context import SparkContext
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 1550802332..c30326ebd1 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -35,8 +35,7 @@ __all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD", "DataFrameNaFuncti
class DataFrame(object):
-
- """A collection of rows that have the same columns.
+ """A distributed collection of data grouped into named columns.
A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
and can be created using various functions in :class:`SQLContext`::
@@ -69,9 +68,7 @@ class DataFrame(object):
@property
def rdd(self):
- """
- Return the content of the :class:`DataFrame` as an :class:`pyspark.RDD`
- of :class:`Row` s.
+ """Returns the content as an :class:`pyspark.RDD` of :class:`Row`.
"""
if not hasattr(self, '_lazy_rdd'):
jrdd = self._jdf.javaToPython()
@@ -93,7 +90,9 @@ class DataFrame(object):
return DataFrameNaFunctions(self)
def toJSON(self, use_unicode=False):
- """Convert a :class:`DataFrame` into a MappedRDD of JSON documents; one document per row.
+ """Converts a :class:`DataFrame` into a :class:`RDD` of string.
+
+ Each row is turned into a JSON document as one element in the returned RDD.
>>> df.toJSON().first()
'{"age":2,"name":"Alice"}'
@@ -102,10 +101,10 @@ class DataFrame(object):
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
def saveAsParquetFile(self, path):
- """Save the contents as a Parquet file, preserving the schema.
+ """Saves the contents as a Parquet file, preserving the schema.
Files that are written out using this method can be read back in as
- a :class:`DataFrame` using the L{SQLContext.parquetFile} method.
+ a :class:`DataFrame` using :func:`SQLContext.parquetFile`.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
@@ -120,8 +119,8 @@ class DataFrame(object):
def registerTempTable(self, name):
"""Registers this RDD as a temporary table using the given name.
- The lifetime of this temporary table is tied to the L{SQLContext}
- that was used to create this DataFrame.
+ The lifetime of this temporary table is tied to the :class:`SQLContext`
+ that was used to create this :class:`DataFrame`.
>>> df.registerTempTable("people")
>>> df2 = sqlCtx.sql("select * from people")
@@ -131,7 +130,7 @@ class DataFrame(object):
self._jdf.registerTempTable(name)
def registerAsTable(self, name):
- """DEPRECATED: use registerTempTable() instead"""
+ """DEPRECATED: use :func:`registerTempTable` instead"""
warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning)
self.registerTempTable(name)
@@ -162,22 +161,19 @@ class DataFrame(object):
return jmode
def saveAsTable(self, tableName, source=None, mode="error", **options):
- """Saves the contents of the :class:`DataFrame` to a data source as a table.
+ """Saves the contents of this :class:`DataFrame` to a data source as a table.
- The data source is specified by the `source` and a set of `options`.
- If `source` is not specified, the default data source configured by
- spark.sql.sources.default will be used.
+ The data source is specified by the ``source`` and a set of ``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
Additionally, mode is used to specify the behavior of the saveAsTable operation when
table already exists in the data source. There are four modes:
- * append: Contents of this :class:`DataFrame` are expected to be appended \
- to existing table.
- * overwrite: Data in the existing table is expected to be overwritten by \
- the contents of this DataFrame.
- * error: An exception is expected to be thrown.
- * ignore: The save operation is expected to not save the contents of the \
- :class:`DataFrame` and to not change the existing table.
+ * `append`: Append contents of this :class:`DataFrame` to existing data.
+ * `overwrite`: Overwrite existing data.
+ * `error`: Throw an exception if data already exists.
+ * `ignore`: Silently ignore this operation if data already exists.
"""
if source is None:
source = self.sql_ctx.getConf("spark.sql.sources.default",
@@ -190,18 +186,17 @@ class DataFrame(object):
def save(self, path=None, source=None, mode="error", **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
- The data source is specified by the `source` and a set of `options`.
- If `source` is not specified, the default data source configured by
- spark.sql.sources.default will be used.
+ The data source is specified by the ``source`` and a set of ``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
Additionally, mode is used to specify the behavior of the save operation when
data already exists in the data source. There are four modes:
- * append: Contents of this :class:`DataFrame` are expected to be appended to existing data.
- * overwrite: Existing data is expected to be overwritten by the contents of this DataFrame.
- * error: An exception is expected to be thrown.
- * ignore: The save operation is expected to not save the contents of \
- the :class:`DataFrame` and to not change the existing data.
+ * `append`: Append contents of this :class:`DataFrame` to existing data.
+ * `overwrite`: Overwrite existing data.
+ * `error`: Throw an exception if data already exists.
+ * `ignore`: Silently ignore this operation if data already exists.
"""
if path is not None:
options["path"] = path
@@ -215,8 +210,7 @@ class DataFrame(object):
@property
def schema(self):
- """Returns the schema of this :class:`DataFrame` (represented by
- a L{StructType}).
+ """Returns the schema of this :class:`DataFrame` as a :class:`types.StructType`.
>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))
@@ -237,11 +231,9 @@ class DataFrame(object):
print (self._jdf.schema().treeString())
def explain(self, extended=False):
- """
- Prints the plans (logical and physical) to the console for
- debugging purpose.
+ """Prints the (logical and physical) plans to the console for debugging purpose.
- If extended is False, only prints the physical plan.
+ :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
>>> df.explain()
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:...
@@ -263,15 +255,13 @@ class DataFrame(object):
print self._jdf.queryExecution().executedPlan().toString()
def isLocal(self):
- """
- Returns True if the `collect` and `take` methods can be run locally
+ """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
(without any Spark executors).
"""
return self._jdf.isLocal()
def show(self, n=20):
- """
- Print the first n rows.
+ """Prints the first ``n`` rows to the console.
>>> df
DataFrame[age: int, name: string]
@@ -286,11 +276,7 @@ class DataFrame(object):
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
def count(self):
- """Return the number of elements in this RDD.
-
- Unlike the base RDD implementation of count, this implementation
- leverages the query optimizer to compute the count on the DataFrame,
- which supports features such as filter pushdown.
+ """Returns the number of rows in this :class:`DataFrame`.
>>> df.count()
2L
@@ -298,10 +284,7 @@ class DataFrame(object):
return self._jdf.count()
def collect(self):
- """Return a list that contains all of the rows.
-
- Each object in the list is a Row, the fields can be accessed as
- attributes.
+ """Returns all the records as a list of :class:`Row`.
>>> df.collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
@@ -313,7 +296,7 @@ class DataFrame(object):
return [cls(r) for r in rs]
def limit(self, num):
- """Limit the result count to the number specified.
+ """Limits the result count to the number specified.
>>> df.limit(1).collect()
[Row(age=2, name=u'Alice')]
@@ -324,10 +307,7 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx)
def take(self, num):
- """Take the first num rows of the RDD.
-
- Each object in the list is a Row, the fields can be accessed as
- attributes.
+ """Returns the first ``num`` rows as a :class:`list` of :class:`Row`.
>>> df.take(2)
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
@@ -335,9 +315,9 @@ class DataFrame(object):
return self.limit(num).collect()
def map(self, f):
- """ Return a new RDD by applying a function to each Row
+ """ Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`.
- It's a shorthand for df.rdd.map()
+ This is a shorthand for ``df.rdd.map()``.
>>> df.map(lambda p: p.name).collect()
[u'Alice', u'Bob']
@@ -345,10 +325,10 @@ class DataFrame(object):
return self.rdd.map(f)
def flatMap(self, f):
- """ Return a new RDD by first applying a function to all elements of this,
+ """ Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`,
and then flattening the results.
- It's a shorthand for df.rdd.flatMap()
+ This is a shorthand for ``df.rdd.flatMap()``.
>>> df.flatMap(lambda p: p.name).collect()
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
@@ -356,10 +336,9 @@ class DataFrame(object):
return self.rdd.flatMap(f)
def mapPartitions(self, f, preservesPartitioning=False):
- """
- Return a new RDD by applying a function to each partition.
+ """Returns a new :class:`RDD` by applying the ``f`` function to each partition.
- It's a shorthand for df.rdd.mapPartitions()
+ This is a shorthand for ``df.rdd.mapPartitions()``.
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(iterator): yield 1
@@ -369,10 +348,9 @@ class DataFrame(object):
return self.rdd.mapPartitions(f, preservesPartitioning)
def foreach(self, f):
- """
- Applies a function to all rows of this DataFrame.
+ """Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.
- It's a shorthand for df.rdd.foreach()
+ This is a shorthand for ``df.rdd.foreach()``.
>>> def f(person):
... print person.name
@@ -381,10 +359,9 @@ class DataFrame(object):
return self.rdd.foreach(f)
def foreachPartition(self, f):
- """
- Applies a function to each partition of this DataFrame.
+ """Applies the ``f`` function to each partition of this :class:`DataFrame`.
- It's a shorthand for df.rdd.foreachPartition()
+ This a shorthand for ``df.rdd.foreachPartition()``.
>>> def f(people):
... for person in people:
@@ -394,14 +371,14 @@ class DataFrame(object):
return self.rdd.foreachPartition(f)
def cache(self):
- """ Persist with the default storage level (C{MEMORY_ONLY_SER}).
+ """ Persists with the default storage level (C{MEMORY_ONLY_SER}).
"""
self.is_cached = True
self._jdf.cache()
return self
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
- """ Set the storage level to persist its values across operations
+ """Sets the storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).
@@ -412,7 +389,7 @@ class DataFrame(object):
return self
def unpersist(self, blocking=True):
- """ Mark it as non-persistent, and remove all blocks for it from
+ """Marks the :class:`DataFrame` as non-persistent, and remove all blocks for it from
memory and disk.
"""
self.is_cached = False
@@ -424,8 +401,7 @@ class DataFrame(object):
# return DataFrame(rdd, self.sql_ctx)
def repartition(self, numPartitions):
- """ Return a new :class:`DataFrame` that has exactly `numPartitions`
- partitions.
+ """Returns a new :class:`DataFrame` that has exactly ``numPartitions`` partitions.
>>> df.repartition(10).rdd.getNumPartitions()
10
@@ -433,8 +409,7 @@ class DataFrame(object):
return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
def distinct(self):
- """
- Return a new :class:`DataFrame` containing the distinct rows in this DataFrame.
+ """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`.
>>> df.distinct().count()
2L
@@ -442,8 +417,7 @@ class DataFrame(object):
return DataFrame(self._jdf.distinct(), self.sql_ctx)
def sample(self, withReplacement, fraction, seed=None):
- """
- Return a sampled subset of this DataFrame.
+ """Returns a sampled subset of this :class:`DataFrame`.
>>> df.sample(False, 0.5, 97).count()
1L
@@ -455,7 +429,7 @@ class DataFrame(object):
@property
def dtypes(self):
- """Return all column names and their data types as a list.
+ """Returns all column names and their data types as a list.
>>> df.dtypes
[('age', 'int'), ('name', 'string')]
@@ -464,7 +438,7 @@ class DataFrame(object):
@property
def columns(self):
- """ Return all column names as a list.
+ """Returns all column names as a list.
>>> df.columns
[u'age', u'name']
@@ -472,13 +446,14 @@ class DataFrame(object):
return [f.name for f in self.schema.fields]
def join(self, other, joinExprs=None, joinType=None):
- """
- Join with another :class:`DataFrame`, using the given join expression.
- The following performs a full outer join between `df1` and `df2`.
+ """Joins with another :class:`DataFrame`, using the given join expression.
+
+ The following performs a full outer join between ``df1`` and ``df2``.
:param other: Right side of the join
:param joinExprs: Join expression
- :param joinType: One of `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.
+ :param joinType: str, default 'inner'.
+ One of `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
@@ -496,9 +471,9 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx)
def sort(self, *cols):
- """ Return a new :class:`DataFrame` sorted by the specified column(s).
+ """Returns a new :class:`DataFrame` sorted by the specified column(s).
- :param cols: The columns or expressions used for sorting
+ :param cols: list of :class:`Column` to sort by.
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
@@ -539,7 +514,9 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx)
def head(self, n=None):
- """ Return the first `n` rows or the first row if n is None.
+ """
+ Returns the first ``n`` rows as a list of :class:`Row`,
+ or the first :class:`Row` if ``n`` is ``None.``
>>> df.head()
Row(age=2, name=u'Alice')
@@ -552,7 +529,7 @@ class DataFrame(object):
return self.take(n)
def first(self):
- """ Return the first row.
+ """Returns the first row as a :class:`Row`.
>>> df.first()
Row(age=2, name=u'Alice')
@@ -560,7 +537,7 @@ class DataFrame(object):
return self.head()
def __getitem__(self, item):
- """ Return the column by given name
+ """Returns the column as a :class:`Column`.
>>> df.select(df['age']).collect()
[Row(age=2), Row(age=5)]
@@ -580,7 +557,7 @@ class DataFrame(object):
raise IndexError("unexpected index: %s" % item)
def __getattr__(self, name):
- """ Return the column by given name
+ """Returns the :class:`Column` denoted by ``name``.
>>> df.select(df.age).collect()
[Row(age=2), Row(age=5)]
@@ -591,7 +568,11 @@ class DataFrame(object):
return Column(jc)
def select(self, *cols):
- """ Selecting a set of expressions.
+ """Projects a set of expressions and returns a new :class:`DataFrame`.
+
+ :param cols: list of column names (string) or expressions (:class:`Column`).
+ If one of the column names is '*', that column is expanded to include all columns
+ in the current DataFrame.
>>> df.select('*').collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
@@ -606,9 +587,9 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx)
def selectExpr(self, *expr):
- """
- Selects a set of SQL expressions. This is a variant of
- `select` that accepts SQL expressions.
+ """Projects a set of SQL expressions and returns a new :class:`DataFrame`.
+
+ This is a variant of :func:`select` that accepts SQL expressions.
>>> df.selectExpr("age * 2", "abs(age)").collect()
[Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)]
@@ -618,10 +599,12 @@ class DataFrame(object):
return DataFrame(jdf, self.sql_ctx)
def filter(self, condition):
- """ Filtering rows using the given condition, which could be
- :class:`Column` expression or string of SQL expression.
+ """Filters rows using the given condition.
+
+ :func:`where` is an alias for :func:`filter`.
- where() is an alias for filter().
+ :param condition: a :class:`Column` of :class:`types.BooleanType`
+ or a string of SQL expression.
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
@@ -644,10 +627,13 @@ class DataFrame(object):
where = filter
def groupBy(self, *cols):
- """ Group the :class:`DataFrame` using the specified columns,
+ """Groups the :class:`DataFrame` using the specified columns,
so we can run aggregation on them. See :class:`GroupedData`
for all the available aggregate functions.
+ :param cols: list of columns to group by.
+ Each element should be a column name (string) or an expression (:class:`Column`).
+
>>> df.groupBy().avg().collect()
[Row(AVG(age)=3.5)]
>>> df.groupBy('name').agg({'age': 'mean'}).collect()
@@ -662,7 +648,7 @@ class DataFrame(object):
def agg(self, *exprs):
""" Aggregate on the entire :class:`DataFrame` without groups
- (shorthand for df.groupBy.agg()).
+ (shorthand for ``df.groupBy.agg()``).
>>> df.agg({"age": "max"}).collect()
[Row(MAX(age)=5)]
@@ -699,7 +685,7 @@ class DataFrame(object):
def dropna(self, how='any', thresh=None, subset=None):
"""Returns a new :class:`DataFrame` omitting rows with null values.
- This is an alias for `na.drop`.
+ This is an alias for ``na.drop()``.
:param how: 'any' or 'all'.
If 'any', drop a row if it contains any nulls.
@@ -735,7 +721,7 @@ class DataFrame(object):
return DataFrame(self._jdf.na().drop(thresh, cols), self.sql_ctx)
def fillna(self, value, subset=None):
- """Replace null values, alias for `na.fill`.
+ """Replace null values, alias for ``na.fill()``.
:param value: int, long, float, string, or dict.
Value to replace null values with.
@@ -790,7 +776,10 @@ class DataFrame(object):
return DataFrame(self._jdf.na().fill(value, cols), self.sql_ctx)
def withColumn(self, colName, col):
- """ Return a new :class:`DataFrame` by adding a column.
+ """Returns a new :class:`DataFrame` by adding a column.
+
+ :param colName: string, name of the new column.
+ :param col: a :class:`Column` expression for the new column.
>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
@@ -798,7 +787,10 @@ class DataFrame(object):
return self.select('*', col.alias(colName))
def withColumnRenamed(self, existing, new):
- """ Rename an existing column to a new name
+ """REturns a new :class:`DataFrame` by renaming an existing column.
+
+ :param existing: string, name of the existing column to rename.
+ :param col: string, new name of the column.
>>> df.withColumnRenamed('age', 'age2').collect()
[Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
@@ -809,8 +801,9 @@ class DataFrame(object):
return self.select(*cols)
def toPandas(self):
- """
- Collect all the rows and return a `pandas.DataFrame`.
+ """Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``.
+
+ This is only available if Pandas is installed and available.
>>> df.toPandas() # doctest: +SKIP
age name
@@ -823,8 +816,7 @@ class DataFrame(object):
# Having SchemaRDD for backward compatibility (for docs)
class SchemaRDD(DataFrame):
- """
- SchemaRDD is deprecated, please use DataFrame
+ """SchemaRDD is deprecated, please use :class:`DataFrame`.
"""
@@ -851,10 +843,9 @@ def df_varargs_api(f):
class GroupedData(object):
-
"""
A set of methods for aggregations on a :class:`DataFrame`,
- created by DataFrame.groupBy().
+ created by :func:`DataFrame.groupBy`.
"""
def __init__(self, jdf, sql_ctx):
@@ -862,14 +853,17 @@ class GroupedData(object):
self.sql_ctx = sql_ctx
def agg(self, *exprs):
- """ Compute aggregates by specifying a map from column name
- to aggregate methods.
+ """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+ The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`.
+
+ If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+ is the column to perform aggregation on, and the value is the aggregate function.
- The available aggregate methods are `avg`, `max`, `min`,
- `sum`, `count`.
+ Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
- :param exprs: list or aggregate columns or a map from column
- name to aggregate methods.
+ :param exprs: a dict mapping from column name (string) to aggregate functions (string),
+ or a list of :class:`Column`.
>>> gdf = df.groupBy(df.name)
>>> gdf.agg({"*": "count"}).collect()
@@ -894,7 +888,7 @@ class GroupedData(object):
@dfapi
def count(self):
- """ Count the number of rows for each group.
+ """Counts the number of records for each group.
>>> df.groupBy(df.age).count().collect()
[Row(age=2, count=1), Row(age=5, count=1)]
@@ -902,8 +896,11 @@ class GroupedData(object):
@df_varargs_api
def mean(self, *cols):
- """Compute the average value for each numeric columns
- for each group. This is an alias for `avg`.
+ """Computes average values for each numeric columns for each group.
+
+ :func:`mean` is an alias for :func:`avg`.
+
+ :param cols: list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().mean('age').collect()
[Row(AVG(age)=3.5)]
@@ -913,8 +910,11 @@ class GroupedData(object):
@df_varargs_api
def avg(self, *cols):
- """Compute the average value for each numeric columns
- for each group.
+ """Computes average values for each numeric columns for each group.
+
+ :func:`mean` is an alias for :func:`avg`.
+
+ :param cols: list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().avg('age').collect()
[Row(AVG(age)=3.5)]
@@ -924,8 +924,7 @@ class GroupedData(object):
@df_varargs_api
def max(self, *cols):
- """Compute the max value for each numeric columns for
- each group.
+ """Computes the max value for each numeric columns for each group.
>>> df.groupBy().max('age').collect()
[Row(MAX(age)=5)]
@@ -935,8 +934,9 @@ class GroupedData(object):
@df_varargs_api
def min(self, *cols):
- """Compute the min value for each numeric column for
- each group.
+ """Computes the min value for each numeric column for each group.
+
+ :param cols: list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().min('age').collect()
[Row(MIN(age)=2)]
@@ -946,8 +946,9 @@ class GroupedData(object):
@df_varargs_api
def sum(self, *cols):
- """Compute the sum for each numeric columns for each
- group.
+ """Compute the sum for each numeric columns for each group.
+
+ :param cols: list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().sum('age').collect()
[Row(SUM(age)=7)]
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 5873f09ae3..8a478fddf0 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -76,7 +76,7 @@ __all__.sort()
def countDistinct(col, *cols):
- """ Return a new Column for distinct count of `col` or `cols`
+ """Returns a new :class:`Column` for distinct count of ``col`` or ``cols``.
>>> df.agg(countDistinct(df.age, df.name).alias('c')).collect()
[Row(c=2)]
@@ -91,7 +91,7 @@ def countDistinct(col, *cols):
def approxCountDistinct(col, rsd=None):
- """ Return a new Column for approximate distinct count of `col`
+ """Returns a new :class:`Column` for approximate distinct count of ``col``.
>>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
[Row(c=2)]
@@ -142,7 +142,7 @@ class UserDefinedFunction(object):
def udf(f, returnType=StringType()):
- """Create a user defined function (UDF)
+ """Creates a :class:`Column` expression representing a user defined function (UDF).
>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 0169028ccc..45eb8b945d 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -33,8 +33,7 @@ __all__ = [
class DataType(object):
-
- """Spark SQL DataType"""
+ """Base class for data types."""
def __repr__(self):
return self.__class__.__name__
@@ -67,7 +66,6 @@ class DataType(object):
# This singleton pattern does not work with pickle, you will get
# another object after pickle and unpickle
class PrimitiveTypeSingleton(type):
-
"""Metaclass for PrimitiveType"""
_instances = {}
@@ -79,66 +77,45 @@ class PrimitiveTypeSingleton(type):
class PrimitiveType(DataType):
-
"""Spark SQL PrimitiveType"""
__metaclass__ = PrimitiveTypeSingleton
class NullType(PrimitiveType):
+ """Null type.
- """Spark SQL NullType
-
- The data type representing None, used for the types which has not
- been inferred.
+ The data type representing None, used for the types that cannot be inferred.
"""
class StringType(PrimitiveType):
-
- """Spark SQL StringType
-
- The data type representing string values.
+ """String data type.
"""
class BinaryType(PrimitiveType):
-
- """Spark SQL BinaryType
-
- The data type representing bytearray values.
+ """Binary (byte array) data type.
"""
class BooleanType(PrimitiveType):
-
- """Spark SQL BooleanType
-
- The data type representing bool values.
+ """Boolean data type.
"""
class DateType(PrimitiveType):
-
- """Spark SQL DateType
-
- The data type representing datetime.date values.
+ """Date (datetime.date) data type.
"""
class TimestampType(PrimitiveType):
-
- """Spark SQL TimestampType
-
- The data type representing datetime.datetime values.
+ """Timestamp (datetime.datetime) data type.
"""
class DecimalType(DataType):
-
- """Spark SQL DecimalType
-
- The data type representing decimal.Decimal values.
+ """Decimal (decimal.Decimal) data type.
"""
def __init__(self, precision=None, scale=None):
@@ -166,80 +143,55 @@ class DecimalType(DataType):
class DoubleType(PrimitiveType):
-
- """Spark SQL DoubleType
-
- The data type representing float values.
+ """Double data type, representing double precision floats.
"""
class FloatType(PrimitiveType):
-
- """Spark SQL FloatType
-
- The data type representing single precision floating-point values.
+ """Float data type, representing single precision floats.
"""
class ByteType(PrimitiveType):
-
- """Spark SQL ByteType
-
- The data type representing int values with 1 singed byte.
+ """Byte data type, i.e. a signed integer in a single byte.
"""
def simpleString(self):
return 'tinyint'
class IntegerType(PrimitiveType):
-
- """Spark SQL IntegerType
-
- The data type representing int values.
+ """Int data type, i.e. a signed 32-bit integer.
"""
def simpleString(self):
return 'int'
class LongType(PrimitiveType):
+ """Long data type, i.e. a signed 64-bit integer.
- """Spark SQL LongType
-
- The data type representing long values. If the any value is
- beyond the range of [-9223372036854775808, 9223372036854775807],
- please use DecimalType.
+ If the values are beyond the range of [-9223372036854775808, 9223372036854775807],
+ please use :class:`DecimalType`.
"""
def simpleString(self):
return 'bigint'
class ShortType(PrimitiveType):
-
- """Spark SQL ShortType
-
- The data type representing int values with 2 signed bytes.
+ """Short data type, i.e. a signed 16-bit integer.
"""
def simpleString(self):
return 'smallint'
class ArrayType(DataType):
+ """Array data type.
- """Spark SQL ArrayType
-
- The data type representing list values. An ArrayType object
- comprises two fields, elementType (a DataType) and containsNull (a bool).
- The field of elementType is used to specify the type of array elements.
- The field of containsNull is used to specify if the array has None values.
-
+ :param elementType: :class:`DataType` of each element in the array.
+ :param containsNull: boolean, whether the array can contain null (None) values.
"""
def __init__(self, elementType, containsNull=True):
- """Creates an ArrayType
-
- :param elementType: the data type of elements.
- :param containsNull: indicates whether the list contains None values.
-
+ """
>>> ArrayType(StringType()) == ArrayType(StringType(), True)
True
>>> ArrayType(StringType(), False) == ArrayType(StringType())
@@ -268,29 +220,17 @@ class ArrayType(DataType):
class MapType(DataType):
+ """Map data type.
- """Spark SQL MapType
-
- The data type representing dict values. A MapType object comprises
- three fields, keyType (a DataType), valueType (a DataType) and
- valueContainsNull (a bool).
-
- The field of keyType is used to specify the type of keys in the map.
- The field of valueType is used to specify the type of values in the map.
- The field of valueContainsNull is used to specify if values of this
- map has None values.
-
- For values of a MapType column, keys are not allowed to have None values.
+ :param keyType: :class:`DataType` of the keys in the map.
+ :param valueType: :class:`DataType` of the values in the map.
+ :param valueContainsNull: indicates whether values can contain null (None) values.
+ Keys in a map data type are not allowed to be null (None).
"""
def __init__(self, keyType, valueType, valueContainsNull=True):
- """Creates a MapType
- :param keyType: the data type of keys.
- :param valueType: the data type of values.
- :param valueContainsNull: indicates whether values contains
- null values.
-
+ """
>>> (MapType(StringType(), IntegerType())
... == MapType(StringType(), IntegerType(), True))
True
@@ -325,30 +265,16 @@ class MapType(DataType):
class StructField(DataType):
+ """A field in :class:`StructType`.
- """Spark SQL StructField
-
- Represents a field in a StructType.
- A StructField object comprises three fields, name (a string),
- dataType (a DataType) and nullable (a bool). The field of name
- is the name of a StructField. The field of dataType specifies
- the data type of a StructField.
-
- The field of nullable specifies if values of a StructField can
- contain None values.
-
+ :param name: string, name of the field.
+ :param dataType: :class:`DataType` of the field.
+ :param nullable: boolean, whether the field can be null (None) or not.
+ :param metadata: a dict from string to simple type that can be serialized to JSON automatically
"""
def __init__(self, name, dataType, nullable=True, metadata=None):
- """Creates a StructField
- :param name: the name of this field.
- :param dataType: the data type of this field.
- :param nullable: indicates whether values of this field
- can be null.
- :param metadata: metadata of this field, which is a map from string
- to simple type that can be serialized to JSON
- automatically
-
+ """
>>> (StructField("f1", StringType(), True)
... == StructField("f1", StringType(), True))
True
@@ -384,17 +310,13 @@ class StructField(DataType):
class StructType(DataType):
+ """Struct type, consisting of a list of :class:`StructField`.
- """Spark SQL StructType
-
- The data type representing rows.
- A StructType object comprises a list of L{StructField}.
-
+ This is the data type representing a :class:`Row`.
"""
def __init__(self, fields):
- """Creates a StructType
-
+ """
>>> struct1 = StructType([StructField("f1", StringType(), True)])
>>> struct2 = StructType([StructField("f1", StringType(), True)])
>>> struct1 == struct2
@@ -425,9 +347,9 @@ class StructType(DataType):
class UserDefinedType(DataType):
- """
+ """User-defined type (UDT).
+
.. note:: WARN: Spark Internal Use Only
- SQL User-Defined Type (UDT).
"""
@classmethod
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
index 3a3dc70f72..bf3c3fe876 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -21,14 +21,17 @@ import java.{lang => jl}
import scala.collection.JavaConversions._
+import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
/**
+ * :: Experimental ::
* Functionality for working with missing data in [[DataFrame]]s.
*/
+@Experimental
final class DataFrameNaFunctions private[sql](df: DataFrame) {
/**