aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDoris Xin <doris.s.xin@gmail.com>2014-08-01 15:02:17 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-01 15:02:17 -0700
commitd88e69561367d65e1a2b94527b80a1f65a2cba90 (patch)
tree8c09aa4ccd951e5e15401d8fbe178a4c75334e01
parent78f2af582286b81e6dc9fa9d455ed2b369d933bd (diff)
downloadspark-d88e69561367d65e1a2b94527b80a1f65a2cba90.tar.gz
spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.tar.bz2
spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.zip
[SPARK-2786][mllib] Python correlations
Author: Doris Xin <doris.s.xin@gmail.com> Closes #1713 from dorx/pythonCorrelation and squashes the following commits: 5f1e60c [Doris Xin] reviewer comments. 46ff6eb [Doris Xin] reviewer comments. ad44085 [Doris Xin] style fix e69d446 [Doris Xin] fixed missed conflicts. eb5bf56 [Doris Xin] merge master cc9f725 [Doris Xin] units passed. 9141a63 [Doris Xin] WIP2 d199f1f [Doris Xin] Moved correlation names into a public object cd163d6 [Doris Xin] WIP
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala39
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala10
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala49
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala21
-rw-r--r--python/pyspark/mllib/_common.py6
-rw-r--r--python/pyspark/mllib/stat.py104
6 files changed, 199 insertions, 30 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index d2e8ccf208..122925d096 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -20,13 +20,15 @@ package org.apache.spark.mllib.api.python
import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.mllib.stat.correlation.CorrelationNames
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
@@ -227,7 +229,7 @@ class PythonMLLibAPI extends Serializable {
jsc: JavaSparkContext,
path: String,
minPartitions: Int): JavaRDD[Array[Byte]] =
- MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD()
+ MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint)
private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
@@ -456,6 +458,37 @@ class PythonMLLibAPI extends Serializable {
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}
+ /**
+ * Java stub for mllib Statistics.corr(X: RDD[Vector], method: String).
+ * Returns the correlation matrix serialized into a byte array understood by deserializers in
+ * pyspark.
+ */
+ def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
+ val inputMatrix = X.rdd.map(deserializeDoubleVector(_))
+ val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
+ serializeDoubleMatrix(to2dArray(result))
+ }
+
+ /**
+ * Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], method: String).
+ */
+ def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): Double = {
+ val xDeser = x.rdd.map(deserializeDouble(_))
+ val yDeser = y.rdd.map(deserializeDouble(_))
+ Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method))
+ }
+
+ // used by the corr methods to retrieve the name of the correlation method passed in via pyspark
+ private def getCorrNameOrDefault(method: String) = {
+ if (method == null) CorrelationNames.defaultCorrName else method
+ }
+
+ // Reformat a Matrix into Array[Array[Double]] for serialization
+ private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
+ val values = matrix.toArray
+ Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
+ }
+
// Used by the *RDD methods to get default seed if not passed in from pyspark
private def getSeedOrDefault(seed: java.lang.Long): Long = {
if (seed == null) Utils.random.nextLong else seed
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
index 9d6de9b6e1..f416a9fbb3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
@@ -23,21 +23,24 @@ import org.apache.spark.mllib.stat.correlation.Correlations
import org.apache.spark.rdd.RDD
/**
- * API for statistical functions in MLlib
+ * API for statistical functions in MLlib.
*/
@Experimental
object Statistics {
/**
+ * :: Experimental ::
* Compute the Pearson correlation matrix for the input RDD of Vectors.
* Columns with 0 covariance produce NaN entries in the correlation matrix.
*
* @param X an RDD[Vector] for which the correlation matrix is to be computed.
* @return Pearson correlation matrix comparing columns in X.
*/
+ @Experimental
def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
/**
+ * :: Experimental ::
* Compute the correlation matrix for the input RDD of Vectors using the specified method.
* Methods currently supported: `pearson` (default), `spearman`.
*
@@ -51,9 +54,11 @@ object Statistics {
* Supported: `pearson` (default), `spearman`
* @return Correlation matrix comparing columns in X.
*/
+ @Experimental
def corr(X: RDD[Vector], method: String): Matrix = Correlations.corrMatrix(X, method)
/**
+ * :: Experimental ::
* Compute the Pearson correlation for the input RDDs.
* Returns NaN if either vector has 0 variance.
*
@@ -64,9 +69,11 @@ object Statistics {
* @param y RDD[Double] of the same cardinality as x.
* @return A Double containing the Pearson correlation between the two input RDD[Double]s
*/
+ @Experimental
def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, y)
/**
+ * :: Experimental ::
* Compute the correlation for the input RDDs using the specified method.
* Methods currently supported: `pearson` (default), `spearman`.
*
@@ -80,5 +87,6 @@ object Statistics {
*@return A Double containing the correlation between the two input RDD[Double]s using the
* specified method.
*/
+ @Experimental
def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
index f23393d3da..1fb8d7b3d4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
@@ -49,43 +49,48 @@ private[stat] trait Correlation {
}
/**
- * Delegates computation to the specific correlation object based on the input method name
- *
- * Currently supported correlations: pearson, spearman.
- * After new correlation algorithms are added, please update the documentation here and in
- * Statistics.scala for the correlation APIs.
- *
- * Maintains the default correlation type, pearson
+ * Delegates computation to the specific correlation object based on the input method name.
*/
private[stat] object Correlations {
- // Note: after new types of correlations are implemented, please update this map
- val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation))
- val defaultCorrName: String = "pearson"
- val defaultCorr: Correlation = nameToObjectMap(defaultCorrName)
-
- def corr(x: RDD[Double], y: RDD[Double], method: String = defaultCorrName): Double = {
+ def corr(x: RDD[Double],
+ y: RDD[Double],
+ method: String = CorrelationNames.defaultCorrName): Double = {
val correlation = getCorrelationFromName(method)
correlation.computeCorrelation(x, y)
}
- def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix = {
+ def corrMatrix(X: RDD[Vector],
+ method: String = CorrelationNames.defaultCorrName): Matrix = {
val correlation = getCorrelationFromName(method)
correlation.computeCorrelationMatrix(X)
}
- /**
- * Match input correlation name with a known name via simple string matching
- *
- * private to stat for ease of unit testing
- */
- private[stat] def getCorrelationFromName(method: String): Correlation = {
+ // Match input correlation name with a known name via simple string matching.
+ def getCorrelationFromName(method: String): Correlation = {
try {
- nameToObjectMap(method)
+ CorrelationNames.nameToObjectMap(method)
} catch {
case nse: NoSuchElementException =>
throw new IllegalArgumentException("Unrecognized method name. Supported correlations: "
- + nameToObjectMap.keys.mkString(", "))
+ + CorrelationNames.nameToObjectMap.keys.mkString(", "))
}
}
}
+
+/**
+ * Maintains supported and default correlation names.
+ *
+ * Currently supported correlations: `pearson`, `spearman`.
+ * Current default correlation: `pearson`.
+ *
+ * After new correlation algorithms are added, please update the documentation here and in
+ * Statistics.scala for the correlation APIs.
+ */
+private[mllib] object CorrelationNames {
+
+ // Note: after new types of correlations are implemented, please update this map.
+ val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation))
+ val defaultCorrName: String = "pearson"
+
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
index d94cfa2fce..bd413a80f5 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.api.python
import org.scalatest.FunSuite
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
class PythonMLLibAPISuite extends FunSuite {
@@ -59,10 +59,25 @@ class PythonMLLibAPISuite extends FunSuite {
}
test("double serialization") {
- for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue)) {
+ for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, Double.NaN)) {
val bytes = py.serializeDouble(x)
val deser = py.deserializeDouble(bytes)
- assert(x === deser)
+ // We use `equals` here for comparison because we cannot use `==` for NaN
+ assert(x.equals(deser))
}
}
+
+ test("matrix to 2D array") {
+ val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
+ val matrix = Matrices.dense(2, 3, values)
+ val arr = py.to2dArray(matrix)
+ val expected = Array(Array[Double](0, 3, 7), Array[Double](1.2, 4.56, 8))
+ assert(arr === expected)
+
+ // Test conversion for empty matrix
+ val empty = Array[Double]()
+ val emptyMatrix = Matrices.dense(0, 0, empty)
+ val empty2D = py.to2dArray(emptyMatrix)
+ assert(empty2D === Array[Array[Double]]())
+ }
}
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 8e3ad6b783..c6ca6a75df 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -101,7 +101,7 @@ def _serialize_double(d):
"""
Serialize a double (float or numpy.float64) into a mutually understood format.
"""
- if type(d) == float or type(d) == float64:
+ if type(d) == float or type(d) == float64 or type(d) == int or type(d) == long:
d = float64(d)
ba = bytearray(8)
_copyto(d, buffer=ba, offset=0, shape=[1], dtype=float64)
@@ -176,6 +176,10 @@ def _deserialize_double(ba, offset=0):
True
>>> _deserialize_double(_serialize_double(float64(0.0))) == 0.0
True
+ >>> _deserialize_double(_serialize_double(1)) == 1.0
+ True
+ >>> _deserialize_double(_serialize_double(1L)) == 1.0
+ True
>>> x = sys.float_info.max
>>> _deserialize_double(_serialize_double(sys.float_info.max)) == x
True
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
new file mode 100644
index 0000000000..0a08a562d1
--- /dev/null
+++ b/python/pyspark/mllib/stat.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Python package for statistical functions in MLlib.
+"""
+
+from pyspark.mllib._common import \
+ _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \
+ _serialize_double, _serialize_double_vector, \
+ _deserialize_double, _deserialize_double_matrix
+
+class Statistics(object):
+
+ @staticmethod
+ def corr(x, y=None, method=None):
+ """
+ Compute the correlation (matrix) for the input RDD(s) using the
+ specified method.
+ Methods currently supported: I{pearson (default), spearman}.
+
+ If a single RDD of Vectors is passed in, a correlation matrix
+ comparing the columns in the input RDD is returned. Use C{method=}
+ to specify the method to be used for single RDD inout.
+ If two RDDs of floats are passed in, a single float is returned.
+
+ >>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
+ >>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
+ >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
+ >>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
+ True
+ >>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
+ True
+ >>> Statistics.corr(x, y, "spearman")
+ 0.5
+ >>> from math import isnan
+ >>> isnan(Statistics.corr(x, zeros))
+ True
+ >>> from linalg import Vectors
+ >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
+ ... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
+ >>> Statistics.corr(rdd)
+ array([[ 1. , 0.05564149, nan, 0.40047142],
+ [ 0.05564149, 1. , nan, 0.91359586],
+ [ nan, nan, 1. , nan],
+ [ 0.40047142, 0.91359586, nan, 1. ]])
+ >>> Statistics.corr(rdd, method="spearman")
+ array([[ 1. , 0.10540926, nan, 0.4 ],
+ [ 0.10540926, 1. , nan, 0.9486833 ],
+ [ nan, nan, 1. , nan],
+ [ 0.4 , 0.9486833 , nan, 1. ]])
+ >>> try:
+ ... Statistics.corr(rdd, "spearman")
+ ... print "Method name as second argument without 'method=' shouldn't be allowed."
+ ... except TypeError:
+ ... pass
+ """
+ sc = x.ctx
+ # Check inputs to determine whether a single value or a matrix is needed for output.
+ # Since it's legal for users to use the method name as the second argument, we need to
+ # check if y is used to specify the method name instead.
+ if type(y) == str:
+ raise TypeError("Use 'method=' to specify method name.")
+ if not y:
+ try:
+ Xser = _get_unmangled_double_vector_rdd(x)
+ except TypeError:
+ raise TypeError("corr called on a single RDD not consisted of Vectors.")
+ resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method)
+ return _deserialize_double_matrix(resultMat)
+ else:
+ xSer = _get_unmangled_rdd(x, _serialize_double)
+ ySer = _get_unmangled_rdd(y, _serialize_double)
+ result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method)
+ return result
+
+
+def _test():
+ import doctest
+ from pyspark import SparkContext
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()