aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/stat/_statistics.py
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-01-29 10:11:44 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-29 10:11:44 -0800
commita3dc6184862345c459d1fba475b1c9210038a913 (patch)
tree706a2dcab1d4c0decf5fe6269fcabf56e8d17b2f /python/pyspark/mllib/stat/_statistics.py
parent5ad78f62056f2560cd371ee964111a646806d0ff (diff)
downloadspark-a3dc6184862345c459d1fba475b1c9210038a913.tar.gz
spark-a3dc6184862345c459d1fba475b1c9210038a913.tar.bz2
spark-a3dc6184862345c459d1fba475b1c9210038a913.zip
[SPARK-5477] refactor stat.py
There is only a single `stat.py` file for the `mllib.stat` package. We recently added `MultivariateGaussian` under `mllib.stat.distribution` in Scala/Java. It would be nice to refactor `stat.py` and make it easy to expand. Note that `ChiSqTestResult` is moved from `mllib.stat` to `mllib.stat.test`. The latter is used in Scala/Java. It is only used in the return value of `Statistics.chiSqTest`, so this should be an okay change. davies Author: Xiangrui Meng <meng@databricks.com> Closes #4266 from mengxr/py-stat-refactor and squashes the following commits: 1a5e1db [Xiangrui Meng] refactor stat.py
Diffstat (limited to 'python/pyspark/mllib/stat/_statistics.py')
-rw-r--r--python/pyspark/mllib/stat/_statistics.py247
1 files changed, 247 insertions, 0 deletions
diff --git a/python/pyspark/mllib/stat/_statistics.py b/python/pyspark/mllib/stat/_statistics.py
new file mode 100644
index 0000000000..218ac148ca
--- /dev/null
+++ b/python/pyspark/mllib/stat/_statistics.py
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark import RDD
+from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
+from pyspark.mllib.linalg import Matrix, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.stat.test import ChiSqTestResult
+
+
+__all__ = ['MultivariateStatisticalSummary', 'Statistics']
+
+
+class MultivariateStatisticalSummary(JavaModelWrapper):
+
+ """
+ Trait for multivariate statistical summary of a data matrix.
+ """
+
+ def mean(self):
+ return self.call("mean").toArray()
+
+ def variance(self):
+ return self.call("variance").toArray()
+
+ def count(self):
+ return self.call("count")
+
+ def numNonzeros(self):
+ return self.call("numNonzeros").toArray()
+
+ def max(self):
+ return self.call("max").toArray()
+
+ def min(self):
+ return self.call("min").toArray()
+
+
+class Statistics(object):
+
+ @staticmethod
+ def colStats(rdd):
+ """
+ Computes column-wise summary statistics for the input RDD[Vector].
+
+ :param rdd: an RDD[Vector] for which column-wise summary statistics
+ are to be computed.
+ :return: :class:`MultivariateStatisticalSummary` object containing
+ column-wise summary statistics.
+
+ >>> from pyspark.mllib.linalg import Vectors
+ >>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
+ ... Vectors.dense([4, 5, 0, 3]),
+ ... Vectors.dense([6, 7, 0, 8])])
+ >>> cStats = Statistics.colStats(rdd)
+ >>> cStats.mean()
+ array([ 4., 4., 0., 3.])
+ >>> cStats.variance()
+ array([ 4., 13., 0., 25.])
+ >>> cStats.count()
+ 3L
+ >>> cStats.numNonzeros()
+ array([ 3., 2., 0., 3.])
+ >>> cStats.max()
+ array([ 6., 7., 0., 8.])
+ >>> cStats.min()
+ array([ 2., 0., 0., -2.])
+ """
+ cStats = callMLlibFunc("colStats", rdd.map(_convert_to_vector))
+ return MultivariateStatisticalSummary(cStats)
+
+ @staticmethod
+ def corr(x, y=None, method=None):
+ """
+ Compute the correlation (matrix) for the input RDD(s) using the
+ specified method.
+ Methods currently supported: I{pearson (default), spearman}.
+
+ If a single RDD of Vectors is passed in, a correlation matrix
+ comparing the columns in the input RDD is returned. Use C{method=}
+ to specify the method to be used for single RDD inout.
+ If two RDDs of floats are passed in, a single float is returned.
+
+ :param x: an RDD of vector for which the correlation matrix is to be computed,
+ or an RDD of float of the same cardinality as y when y is specified.
+ :param y: an RDD of float of the same cardinality as x.
+ :param method: String specifying the method to use for computing correlation.
+ Supported: `pearson` (default), `spearman`
+ :return: Correlation matrix comparing columns in x.
+
+ >>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
+ >>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
+ >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
+ >>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
+ True
+ >>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
+ True
+ >>> Statistics.corr(x, y, "spearman")
+ 0.5
+ >>> from math import isnan
+ >>> isnan(Statistics.corr(x, zeros))
+ True
+ >>> from pyspark.mllib.linalg import Vectors
+ >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
+ ... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
+ >>> pearsonCorr = Statistics.corr(rdd)
+ >>> print str(pearsonCorr).replace('nan', 'NaN')
+ [[ 1. 0.05564149 NaN 0.40047142]
+ [ 0.05564149 1. NaN 0.91359586]
+ [ NaN NaN 1. NaN]
+ [ 0.40047142 0.91359586 NaN 1. ]]
+ >>> spearmanCorr = Statistics.corr(rdd, method="spearman")
+ >>> print str(spearmanCorr).replace('nan', 'NaN')
+ [[ 1. 0.10540926 NaN 0.4 ]
+ [ 0.10540926 1. NaN 0.9486833 ]
+ [ NaN NaN 1. NaN]
+ [ 0.4 0.9486833 NaN 1. ]]
+ >>> try:
+ ... Statistics.corr(rdd, "spearman")
+ ... print "Method name as second argument without 'method=' shouldn't be allowed."
+ ... except TypeError:
+ ... pass
+ """
+ # Check inputs to determine whether a single value or a matrix is needed for output.
+ # Since it's legal for users to use the method name as the second argument, we need to
+ # check if y is used to specify the method name instead.
+ if type(y) == str:
+ raise TypeError("Use 'method=' to specify method name.")
+
+ if not y:
+ return callMLlibFunc("corr", x.map(_convert_to_vector), method).toArray()
+ else:
+ return callMLlibFunc("corr", x.map(float), y.map(float), method)
+
+ @staticmethod
+ def chiSqTest(observed, expected=None):
+ """
+ .. note:: Experimental
+
+ If `observed` is Vector, conduct Pearson's chi-squared goodness
+ of fit test of the observed data against the expected distribution,
+ or againt the uniform distribution (by default), with each category
+ having an expected frequency of `1 / len(observed)`.
+ (Note: `observed` cannot contain negative values)
+
+ If `observed` is matrix, conduct Pearson's independence test on the
+ input contingency matrix, which cannot contain negative entries or
+ columns or rows that sum up to 0.
+
+ If `observed` is an RDD of LabeledPoint, conduct Pearson's independence
+ test for every feature against the label across the input RDD.
+ For each feature, the (feature, label) pairs are converted into a
+ contingency matrix for which the chi-squared statistic is computed.
+ All label and feature values must be categorical.
+
+ :param observed: it could be a vector containing the observed categorical
+ counts/relative frequencies, or the contingency matrix
+ (containing either counts or relative frequencies),
+ or an RDD of LabeledPoint containing the labeled dataset
+ with categorical features. Real-valued features will be
+ treated as categorical for each distinct value.
+ :param expected: Vector containing the expected categorical counts/relative
+ frequencies. `expected` is rescaled if the `expected` sum
+ differs from the `observed` sum.
+ :return: ChiSquaredTest object containing the test statistic, degrees
+ of freedom, p-value, the method used, and the null hypothesis.
+
+ >>> from pyspark.mllib.linalg import Vectors, Matrices
+ >>> observed = Vectors.dense([4, 6, 5])
+ >>> pearson = Statistics.chiSqTest(observed)
+ >>> print pearson.statistic
+ 0.4
+ >>> pearson.degreesOfFreedom
+ 2
+ >>> print round(pearson.pValue, 4)
+ 0.8187
+ >>> pearson.method
+ u'pearson'
+ >>> pearson.nullHypothesis
+ u'observed follows the same distribution as expected.'
+
+ >>> observed = Vectors.dense([21, 38, 43, 80])
+ >>> expected = Vectors.dense([3, 5, 7, 20])
+ >>> pearson = Statistics.chiSqTest(observed, expected)
+ >>> print round(pearson.pValue, 4)
+ 0.0027
+
+ >>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
+ >>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
+ >>> print round(chi.statistic, 4)
+ 21.9958
+
+ >>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
+ ... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
+ ... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
+ ... LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
+ ... LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
+ ... LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
+ >>> rdd = sc.parallelize(data, 4)
+ >>> chi = Statistics.chiSqTest(rdd)
+ >>> print chi[0].statistic
+ 0.75
+ >>> print chi[1].statistic
+ 1.5
+ """
+ if isinstance(observed, RDD):
+ if not isinstance(observed.first(), LabeledPoint):
+ raise ValueError("observed should be an RDD of LabeledPoint")
+ jmodels = callMLlibFunc("chiSqTest", observed)
+ return [ChiSqTestResult(m) for m in jmodels]
+
+ if isinstance(observed, Matrix):
+ jmodel = callMLlibFunc("chiSqTest", observed)
+ else:
+ if expected and len(expected) != len(observed):
+ raise ValueError("`expected` should have same length with `observed`")
+ jmodel = callMLlibFunc("chiSqTest", _convert_to_vector(observed), expected)
+ return ChiSqTestResult(jmodel)
+
+
+def _test():
+ import doctest
+ from pyspark import SparkContext
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()