aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDoris Xin <doris.s.xin@gmail.com>2014-08-01 15:02:17 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-01 15:02:17 -0700
commitd88e69561367d65e1a2b94527b80a1f65a2cba90 (patch)
tree8c09aa4ccd951e5e15401d8fbe178a4c75334e01 /python
parent78f2af582286b81e6dc9fa9d455ed2b369d933bd (diff)
downloadspark-d88e69561367d65e1a2b94527b80a1f65a2cba90.tar.gz
spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.tar.bz2
spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.zip
[SPARK-2786][mllib] Python correlations
Author: Doris Xin <doris.s.xin@gmail.com> Closes #1713 from dorx/pythonCorrelation and squashes the following commits: 5f1e60c [Doris Xin] reviewer comments. 46ff6eb [Doris Xin] reviewer comments. ad44085 [Doris Xin] style fix e69d446 [Doris Xin] fixed missed conflicts. eb5bf56 [Doris Xin] merge master cc9f725 [Doris Xin] units passed. 9141a63 [Doris Xin] WIP2 d199f1f [Doris Xin] Moved correlation names into a public object cd163d6 [Doris Xin] WIP
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/_common.py6
-rw-r--r--python/pyspark/mllib/stat.py104
2 files changed, 109 insertions, 1 deletions
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 8e3ad6b783..c6ca6a75df 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -101,7 +101,7 @@ def _serialize_double(d):
"""
Serialize a double (float or numpy.float64) into a mutually understood format.
"""
- if type(d) == float or type(d) == float64:
+ if type(d) == float or type(d) == float64 or type(d) == int or type(d) == long:
d = float64(d)
ba = bytearray(8)
_copyto(d, buffer=ba, offset=0, shape=[1], dtype=float64)
@@ -176,6 +176,10 @@ def _deserialize_double(ba, offset=0):
True
>>> _deserialize_double(_serialize_double(float64(0.0))) == 0.0
True
+ >>> _deserialize_double(_serialize_double(1)) == 1.0
+ True
+ >>> _deserialize_double(_serialize_double(1L)) == 1.0
+ True
>>> x = sys.float_info.max
>>> _deserialize_double(_serialize_double(sys.float_info.max)) == x
True
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
new file mode 100644
index 0000000000..0a08a562d1
--- /dev/null
+++ b/python/pyspark/mllib/stat.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Python package for statistical functions in MLlib.
+"""
+
+from pyspark.mllib._common import \
+ _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \
+ _serialize_double, _serialize_double_vector, \
+ _deserialize_double, _deserialize_double_matrix
+
+class Statistics(object):
+
+ @staticmethod
+ def corr(x, y=None, method=None):
+ """
+ Compute the correlation (matrix) for the input RDD(s) using the
+ specified method.
+ Methods currently supported: I{pearson (default), spearman}.
+
+ If a single RDD of Vectors is passed in, a correlation matrix
+ comparing the columns in the input RDD is returned. Use C{method=}
+ to specify the method to be used for single RDD inout.
+ If two RDDs of floats are passed in, a single float is returned.
+
+ >>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
+ >>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
+ >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
+ >>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
+ True
+ >>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
+ True
+ >>> Statistics.corr(x, y, "spearman")
+ 0.5
+ >>> from math import isnan
+ >>> isnan(Statistics.corr(x, zeros))
+ True
+ >>> from linalg import Vectors
+ >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
+ ... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
+ >>> Statistics.corr(rdd)
+ array([[ 1. , 0.05564149, nan, 0.40047142],
+ [ 0.05564149, 1. , nan, 0.91359586],
+ [ nan, nan, 1. , nan],
+ [ 0.40047142, 0.91359586, nan, 1. ]])
+ >>> Statistics.corr(rdd, method="spearman")
+ array([[ 1. , 0.10540926, nan, 0.4 ],
+ [ 0.10540926, 1. , nan, 0.9486833 ],
+ [ nan, nan, 1. , nan],
+ [ 0.4 , 0.9486833 , nan, 1. ]])
+ >>> try:
+ ... Statistics.corr(rdd, "spearman")
+ ... print "Method name as second argument without 'method=' shouldn't be allowed."
+ ... except TypeError:
+ ... pass
+ """
+ sc = x.ctx
+ # Check inputs to determine whether a single value or a matrix is needed for output.
+ # Since it's legal for users to use the method name as the second argument, we need to
+ # check if y is used to specify the method name instead.
+ if type(y) == str:
+ raise TypeError("Use 'method=' to specify method name.")
+ if not y:
+ try:
+ Xser = _get_unmangled_double_vector_rdd(x)
+ except TypeError:
+ raise TypeError("corr called on a single RDD not consisted of Vectors.")
+ resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method)
+ return _deserialize_double_matrix(resultMat)
+ else:
+ xSer = _get_unmangled_rdd(x, _serialize_double)
+ ySer = _get_unmangled_rdd(y, _serialize_double)
+ result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method)
+ return result
+
+
+def _test():
+ import doctest
+ from pyspark import SparkContext
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()