diff options
author | Doris Xin <doris.s.xin@gmail.com> | 2014-08-01 15:02:17 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-08-01 15:02:17 -0700 |
commit | d88e69561367d65e1a2b94527b80a1f65a2cba90 (patch) | |
tree | 8c09aa4ccd951e5e15401d8fbe178a4c75334e01 /mllib/src/main | |
parent | 78f2af582286b81e6dc9fa9d455ed2b369d933bd (diff) | |
download | spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.tar.gz spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.tar.bz2 spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.zip |
[SPARK-2786][mllib] Python correlations
Author: Doris Xin <doris.s.xin@gmail.com>
Closes #1713 from dorx/pythonCorrelation and squashes the following commits:
5f1e60c [Doris Xin] reviewer comments.
46ff6eb [Doris Xin] reviewer comments.
ad44085 [Doris Xin] style fix
e69d446 [Doris Xin] fixed missed conflicts.
eb5bf56 [Doris Xin] merge master
cc9f725 [Doris Xin] units passed.
9141a63 [Doris Xin] WIP2
d199f1f [Doris Xin] Moved correlation names into a public object
cd163d6 [Doris Xin] WIP
Diffstat (limited to 'mllib/src/main')
3 files changed, 72 insertions, 26 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index d2e8ccf208..122925d096 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -20,13 +20,15 @@ package org.apache.spark.mllib.api.python import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ -import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors} import org.apache.spark.mllib.random.{RandomRDDGenerators => RG} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.mllib.stat.correlation.CorrelationNames import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -227,7 +229,7 @@ class PythonMLLibAPI extends Serializable { jsc: JavaSparkContext, path: String, minPartitions: Int): JavaRDD[Array[Byte]] = - MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD() + MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint) private def trainRegressionModel( trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel, @@ -456,6 +458,37 @@ class PythonMLLibAPI extends Serializable { ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } + /** + * Java stub for mllib Statistics.corr(X: RDD[Vector], method: String). + * Returns the correlation matrix serialized into a byte array understood by deserializers in + * pyspark. + */ + def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = { + val inputMatrix = X.rdd.map(deserializeDoubleVector(_)) + val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method)) + serializeDoubleMatrix(to2dArray(result)) + } + + /** + * Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], method: String). + */ + def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): Double = { + val xDeser = x.rdd.map(deserializeDouble(_)) + val yDeser = y.rdd.map(deserializeDouble(_)) + Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method)) + } + + // used by the corr methods to retrieve the name of the correlation method passed in via pyspark + private def getCorrNameOrDefault(method: String) = { + if (method == null) CorrelationNames.defaultCorrName else method + } + + // Reformat a Matrix into Array[Array[Double]] for serialization + private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = { + val values = matrix.toArray + Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows)) + } + // Used by the *RDD methods to get default seed if not passed in from pyspark private def getSeedOrDefault(seed: java.lang.Long): Long = { if (seed == null) Utils.random.nextLong else seed diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 9d6de9b6e1..f416a9fbb3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -23,21 +23,24 @@ import org.apache.spark.mllib.stat.correlation.Correlations import org.apache.spark.rdd.RDD /** - * API for statistical functions in MLlib + * API for statistical functions in MLlib. */ @Experimental object Statistics { /** + * :: Experimental :: * Compute the Pearson correlation matrix for the input RDD of Vectors. * Columns with 0 covariance produce NaN entries in the correlation matrix. * * @param X an RDD[Vector] for which the correlation matrix is to be computed. * @return Pearson correlation matrix comparing columns in X. */ + @Experimental def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X) /** + * :: Experimental :: * Compute the correlation matrix for the input RDD of Vectors using the specified method. * Methods currently supported: `pearson` (default), `spearman`. * @@ -51,9 +54,11 @@ object Statistics { * Supported: `pearson` (default), `spearman` * @return Correlation matrix comparing columns in X. */ + @Experimental def corr(X: RDD[Vector], method: String): Matrix = Correlations.corrMatrix(X, method) /** + * :: Experimental :: * Compute the Pearson correlation for the input RDDs. * Returns NaN if either vector has 0 variance. * @@ -64,9 +69,11 @@ object Statistics { * @param y RDD[Double] of the same cardinality as x. * @return A Double containing the Pearson correlation between the two input RDD[Double]s */ + @Experimental def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, y) /** + * :: Experimental :: * Compute the correlation for the input RDDs using the specified method. * Methods currently supported: `pearson` (default), `spearman`. * @@ -80,5 +87,6 @@ object Statistics { *@return A Double containing the correlation between the two input RDD[Double]s using the * specified method. */ + @Experimental def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala index f23393d3da..1fb8d7b3d4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala @@ -49,43 +49,48 @@ private[stat] trait Correlation { } /** - * Delegates computation to the specific correlation object based on the input method name - * - * Currently supported correlations: pearson, spearman. - * After new correlation algorithms are added, please update the documentation here and in - * Statistics.scala for the correlation APIs. - * - * Maintains the default correlation type, pearson + * Delegates computation to the specific correlation object based on the input method name. */ private[stat] object Correlations { - // Note: after new types of correlations are implemented, please update this map - val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation)) - val defaultCorrName: String = "pearson" - val defaultCorr: Correlation = nameToObjectMap(defaultCorrName) - - def corr(x: RDD[Double], y: RDD[Double], method: String = defaultCorrName): Double = { + def corr(x: RDD[Double], + y: RDD[Double], + method: String = CorrelationNames.defaultCorrName): Double = { val correlation = getCorrelationFromName(method) correlation.computeCorrelation(x, y) } - def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix = { + def corrMatrix(X: RDD[Vector], + method: String = CorrelationNames.defaultCorrName): Matrix = { val correlation = getCorrelationFromName(method) correlation.computeCorrelationMatrix(X) } - /** - * Match input correlation name with a known name via simple string matching - * - * private to stat for ease of unit testing - */ - private[stat] def getCorrelationFromName(method: String): Correlation = { + // Match input correlation name with a known name via simple string matching. + def getCorrelationFromName(method: String): Correlation = { try { - nameToObjectMap(method) + CorrelationNames.nameToObjectMap(method) } catch { case nse: NoSuchElementException => throw new IllegalArgumentException("Unrecognized method name. Supported correlations: " - + nameToObjectMap.keys.mkString(", ")) + + CorrelationNames.nameToObjectMap.keys.mkString(", ")) } } } + +/** + * Maintains supported and default correlation names. + * + * Currently supported correlations: `pearson`, `spearman`. + * Current default correlation: `pearson`. + * + * After new correlation algorithms are added, please update the documentation here and in + * Statistics.scala for the correlation APIs. + */ +private[mllib] object CorrelationNames { + + // Note: after new types of correlations are implemented, please update this map. + val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation)) + val defaultCorrName: String = "pearson" + +} |