From 4dc8d74491b101a794cf8d386d8c5ebc6019b75f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 1 May 2015 13:29:17 -0700 Subject: [SPARK-7240][SQL] Single pass covariance calculation for dataframes Added the calculation of covariance between two columns to DataFrames. cc mengxr rxin Author: Burak Yavuz Closes #5825 from brkyvz/df-cov and squashes the following commits: cb18046 [Burak Yavuz] changed to sample covariance f2e862b [Burak Yavuz] fixed failed test 51e39b8 [Burak Yavuz] moved implementation 0c6a759 [Burak Yavuz] addressed math comments 8456eca [Burak Yavuz] fix pyStyle3 aa2ad29 [Burak Yavuz] fix pyStyle2 4e97a50 [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into df-cov e3b0b85 [Burak Yavuz] addressed comments v0.1 a7115f1 [Burak Yavuz] fix python style 7dc6dbc [Burak Yavuz] reorder imports 408cb77 [Burak Yavuz] initial commit --- python/pyspark/sql/__init__.py | 4 +++- python/pyspark/sql/dataframe.py | 36 +++++++++++++++++++++++++++++++++++- python/pyspark/sql/tests.py | 5 +++++ 3 files changed, 43 insertions(+), 2 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 6d54b9e49e..b60b991dd4 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -54,7 +54,9 @@ del modname, sys from pyspark.sql.types import Row from pyspark.sql.context import SQLContext, HiveContext from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions +from pyspark.sql.dataframe import DataFrameStatFunctions __all__ = [ - 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', 'DataFrameNaFunctions' + 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', + 'DataFrameNaFunctions', 'DataFrameStatFunctions' ] diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5908ebc990..1f08c2df93 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -34,7 +34,8 @@ from pyspark.sql.types import * from pyspark.sql.types import _create_cls, _parse_datatype_json_string -__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD", "DataFrameNaFunctions"] +__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD", "DataFrameNaFunctions", + "DataFrameStatFunctions"] class DataFrame(object): @@ -93,6 +94,12 @@ class DataFrame(object): """ return DataFrameNaFunctions(self) + @property + def stat(self): + """Returns a :class:`DataFrameStatFunctions` for statistic functions. + """ + return DataFrameStatFunctions(self) + @ignore_unicode_prefix def toJSON(self, use_unicode=True): """Converts a :class:`DataFrame` into a :class:`RDD` of string. @@ -868,6 +875,20 @@ class DataFrame(object): return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx) + def cov(self, col1, col2): + """ + Calculate the sample covariance for the given columns, specified by their names, as a + double value. :func:`DataFrame.cov` and :func:`DataFrameStatFunctions.cov` are aliases. + + :param col1: The name of the first column + :param col2: The name of the second column + """ + if not isinstance(col1, str): + raise ValueError("col1 should be a string.") + if not isinstance(col2, str): + raise ValueError("col2 should be a string.") + return self._jdf.stat().cov(col1, col2) + @ignore_unicode_prefix def withColumn(self, colName, col): """Returns a new :class:`DataFrame` by adding a column. @@ -1311,6 +1332,19 @@ class DataFrameNaFunctions(object): fill.__doc__ = DataFrame.fillna.__doc__ +class DataFrameStatFunctions(object): + """Functionality for statistic functions with :class:`DataFrame`. + """ + + def __init__(self, df): + self.df = df + + def cov(self, col1, col2): + return self.df.cov(col1, col2) + + cov.__doc__ = DataFrame.cov.__doc__ + + def _test(): import doctest from pyspark.context import SparkContext diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 5640bb5ea2..44c8b6a1aa 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -387,6 +387,11 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(95 < g.agg(functions.approxCountDistinct(df.key)).first()[0]) self.assertEqual(100, g.agg(functions.countDistinct(df.value)).first()[0]) + def test_cov(self): + df = self.sc.parallelize([Row(a=i, b=2 * i) for i in range(10)]).toDF() + cov = df.stat.cov("a", "b") + self.assertTrue(abs(cov - 55.0 / 3) < 1e-6) + def test_math_functions(self): df = self.sc.parallelize([Row(a=i, b=2 * i) for i in range(10)]).toDF() from pyspark.sql import mathfunctions as functions -- cgit v1.2.3