aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/dataframe.py
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-03-30 20:47:10 -0700
committerReynold Xin <rxin@databricks.com>2015-03-30 20:47:10 -0700
commitb8ff2bc61c9835867f56afa1860ab5eb727c4a58 (patch)
treee29f737f32f9c21e22ff6fd7778549ec907c6015 /python/pyspark/sql/dataframe.py
parentfde6945417355ae57500b67d034c9cad4f20d240 (diff)
downloadspark-b8ff2bc61c9835867f56afa1860ab5eb727c4a58.tar.gz
spark-b8ff2bc61c9835867f56afa1860ab5eb727c4a58.tar.bz2
spark-b8ff2bc61c9835867f56afa1860ab5eb727c4a58.zip
[SPARK-6119][SQL] DataFrame support for missing data handling
This pull request adds variants of DataFrame.na.drop and DataFrame.na.fill to the Scala/Java API, and DataFrame.fillna and DataFrame.dropna to the Python API. Author: Reynold Xin <rxin@databricks.com> Closes #5274 from rxin/df-missing-value and squashes the following commits: 4ee1b98 [Reynold Xin] Improve error reporting in Python. 33a330c [Reynold Xin] Remove replace for now. bc4fdbb [Reynold Xin] Added documentation for replace. d56f5a5 [Reynold Xin] Added replace for Scala/Java. 2385d00 [Reynold Xin] Feedback from Xiangrui on "how". 914a374 [Reynold Xin] fill with map. 185c67e [Reynold Xin] Allow specifying column subsets in fill. 749eb47 [Reynold Xin] fillna 249b94e [Reynold Xin] Removing undefined functions. 6a73c68 [Reynold Xin] Missing file. 67d7003 [Reynold Xin] [SPARK-6119][SQL] DataFrame.na.drop (Scala/Java) and DataFrame.dropna (Python)
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r--python/pyspark/sql/dataframe.py86
1 files changed, 86 insertions, 0 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 23c0e63e77..4f174de811 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -690,6 +690,86 @@ class DataFrame(object):
"""
return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sql_ctx)
+ def dropna(self, how='any', thresh=None, subset=None):
+ """Returns a new :class:`DataFrame` omitting rows with null values.
+
+ :param how: 'any' or 'all'.
+ If 'any', drop a row if it contains any nulls.
+ If 'all', drop a row only if all its values are null.
+ :param thresh: int, default None
+ If specified, drop rows that have less than `thresh` non-null values.
+ This overwrites the `how` parameter.
+ :param subset: optional list of column names to consider.
+
+ >>> df4.dropna().show()
+ age height name
+ 10 80 Alice
+ """
+ if how is not None and how not in ['any', 'all']:
+ raise ValueError("how ('" + how + "') should be 'any' or 'all'")
+
+ if subset is None:
+ subset = self.columns
+ elif isinstance(subset, basestring):
+ subset = [subset]
+ elif not isinstance(subset, (list, tuple)):
+ raise ValueError("subset should be a list or tuple of column names")
+
+ if thresh is None:
+ thresh = len(subset) if how == 'any' else 1
+
+ cols = ListConverter().convert(subset, self.sql_ctx._sc._gateway._gateway_client)
+ cols = self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols)
+ return DataFrame(self._jdf.na().drop(thresh, cols), self.sql_ctx)
+
+ def fillna(self, value, subset=None):
+ """Replace null values.
+
+ :param value: int, long, float, string, or dict.
+ Value to replace null values with.
+ If the value is a dict, then `subset` is ignored and `value` must be a mapping
+ from column name (string) to replacement value. The replacement value must be
+ an int, long, float, or string.
+ :param subset: optional list of column names to consider.
+ Columns specified in subset that do not have matching data type are ignored.
+ For example, if `value` is a string, and subset contains a non-string column,
+ then the non-string column is simply ignored.
+
+ >>> df4.fillna(50).show()
+ age height name
+ 10 80 Alice
+ 5 50 Bob
+ 50 50 Tom
+ 50 50 null
+
+ >>> df4.fillna({'age': 50, 'name': 'unknown'}).show()
+ age height name
+ 10 80 Alice
+ 5 null Bob
+ 50 null Tom
+ 50 null unknown
+ """
+ if not isinstance(value, (float, int, long, basestring, dict)):
+ raise ValueError("value should be a float, int, long, string, or dict")
+
+ if isinstance(value, (int, long)):
+ value = float(value)
+
+ if isinstance(value, dict):
+ value = MapConverter().convert(value, self.sql_ctx._sc._gateway._gateway_client)
+ return DataFrame(self._jdf.na().fill(value), self.sql_ctx)
+ elif subset is None:
+ return DataFrame(self._jdf.na().fill(value), self.sql_ctx)
+ else:
+ if isinstance(subset, basestring):
+ subset = [subset]
+ elif not isinstance(subset, (list, tuple)):
+ raise ValueError("subset should be a list or tuple of column names")
+
+ cols = ListConverter().convert(subset, self.sql_ctx._sc._gateway._gateway_client)
+ cols = self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols)
+ return DataFrame(self._jdf.na().fill(value, cols), self.sql_ctx)
+
def withColumn(self, colName, col):
""" Return a new :class:`DataFrame` by adding a column.
@@ -1069,6 +1149,12 @@ def _test():
globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF()
globs['df3'] = sc.parallelize([Row(name='Alice', age=2, height=80),
Row(name='Bob', age=5, height=85)]).toDF()
+
+ globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80),
+ Row(name='Bob', age=5, height=None),
+ Row(name='Tom', age=None, height=None),
+ Row(name=None, age=None, height=None)]).toDF()
+
(failure_count, test_count) = doctest.testmod(
pyspark.sql.dataframe, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)