diff options
-rw-r--r-- | python/pyspark/sql/dataframe.py | 54 | ||||
-rw-r--r-- | python/pyspark/sql/tests.py | 7 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 10 |
3 files changed, 71 insertions, 0 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 7275e69353..76fbb0c9aa 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1178,6 +1178,55 @@ class DataFrame(object): return DataFrame( self._jdf.na().replace(self._jseq(subset), self._jmap(rep_dict)), self.sql_ctx) + @since(2.0) + def approxQuantile(self, col, probabilities, relativeError): + """ + Calculates the approximate quantiles of a numerical column of a + DataFrame. + + The result of this algorithm has the following deterministic bound: + If the DataFrame has N elements and if we request the quantile at + probability `p` up to error `err`, then the algorithm will return + a sample `x` from the DataFrame so that the *exact* rank of `x` is + close to (p * N). More precisely, + + floor((p - err) * N) <= rank(x) <= ceil((p + err) * N). + + This method implements a variation of the Greenwald-Khanna + algorithm (with some speed optimizations). The algorithm was first + present in [[http://dx.doi.org/10.1145/375663.375670 + Space-efficient Online Computation of Quantile Summaries]] + by Greenwald and Khanna. + + :param col: the name of the numerical column + :param probabilities: a list of quantile probabilities + Each number must belong to [0, 1]. + For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + :param relativeError: The relative target precision to achieve + (>= 0). If set to zero, the exact quantiles are computed, which + could be very expensive. Note that values greater than 1 are + accepted but give the same result as 1. + :return: the approximate quantiles at the given probabilities + """ + if not isinstance(col, str): + raise ValueError("col should be a string.") + + if not isinstance(probabilities, (list, tuple)): + raise ValueError("probabilities should be a list or tuple") + if isinstance(probabilities, tuple): + probabilities = list(probabilities) + for p in probabilities: + if not isinstance(p, (float, int, long)) or p < 0 or p > 1: + raise ValueError("probabilities should be numerical (float, int, long) in [0,1].") + probabilities = _to_list(self._sc, probabilities) + + if not isinstance(relativeError, (float, int, long)) or relativeError < 0: + raise ValueError("relativeError should be numerical (float, int, long) >= 0.") + relativeError = float(relativeError) + + jaq = self._jdf.stat().approxQuantile(col, probabilities, relativeError) + return list(jaq) + @since(1.4) def corr(self, col1, col2, method=None): """ @@ -1396,6 +1445,11 @@ class DataFrameStatFunctions(object): def __init__(self, df): self.df = df + def approxQuantile(self, col, probabilities, relativeError): + return self.df.approxQuantile(col, probabilities, relativeError) + + approxQuantile.__doc__ = DataFrame.approxQuantile.__doc__ + def corr(self, col1, col2, method=None): return self.df.corr(col1, col2, method) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index cc11c0f35c..90fd769691 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -669,6 +669,13 @@ class SQLTests(ReusedPySparkTestCase): functions.last(df2.id, True).alias('d')) self.assertEqual([Row(a=None, b=1, c=None, d=98)], df3.collect()) + def test_approxQuantile(self): + df = self.sc.parallelize([Row(a=i) for i in range(10)]).toDF() + aq = df.stat.approxQuantile("a", [0.1, 0.5, 0.9], 0.1) + self.assertTrue(isinstance(aq, list)) + self.assertEqual(len(aq), 3) + self.assertTrue(all(isinstance(q, float) for q in aq)) + def test_corr(self): import math df = self.sc.parallelize([Row(a=i, b=math.sqrt(i)) for i in range(10)]).toDF() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 39a31ab028..3eb1f0f0d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -71,6 +71,16 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { } /** + * Python-friendly version of [[approxQuantile()]] + */ + private[spark] def approxQuantile( + col: String, + probabilities: List[Double], + relativeError: Double): java.util.List[Double] = { + approxQuantile(col, probabilities.toArray, relativeError).toList.asJava + } + + /** * Calculate the sample covariance of two numerical columns of a DataFrame. * @param col1 the name of the first column * @param col2 the name of the second column |