aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorDoris Xin <doris.s.xin@gmail.com>2014-08-01 15:02:17 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-01 15:02:17 -0700
commitd88e69561367d65e1a2b94527b80a1f65a2cba90 (patch)
tree8c09aa4ccd951e5e15401d8fbe178a4c75334e01 /mllib
parent78f2af582286b81e6dc9fa9d455ed2b369d933bd (diff)
downloadspark-d88e69561367d65e1a2b94527b80a1f65a2cba90.tar.gz
spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.tar.bz2
spark-d88e69561367d65e1a2b94527b80a1f65a2cba90.zip
[SPARK-2786][mllib] Python correlations
Author: Doris Xin <doris.s.xin@gmail.com> Closes #1713 from dorx/pythonCorrelation and squashes the following commits: 5f1e60c [Doris Xin] reviewer comments. 46ff6eb [Doris Xin] reviewer comments. ad44085 [Doris Xin] style fix e69d446 [Doris Xin] fixed missed conflicts. eb5bf56 [Doris Xin] merge master cc9f725 [Doris Xin] units passed. 9141a63 [Doris Xin] WIP2 d199f1f [Doris Xin] Moved correlation names into a public object cd163d6 [Doris Xin] WIP
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala39
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala10
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala49
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala21
4 files changed, 90 insertions, 29 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index d2e8ccf208..122925d096 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -20,13 +20,15 @@ package org.apache.spark.mllib.api.python
import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.mllib.stat.correlation.CorrelationNames
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
@@ -227,7 +229,7 @@ class PythonMLLibAPI extends Serializable {
jsc: JavaSparkContext,
path: String,
minPartitions: Int): JavaRDD[Array[Byte]] =
- MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD()
+ MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint)
private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
@@ -456,6 +458,37 @@ class PythonMLLibAPI extends Serializable {
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}
+ /**
+ * Java stub for mllib Statistics.corr(X: RDD[Vector], method: String).
+ * Returns the correlation matrix serialized into a byte array understood by deserializers in
+ * pyspark.
+ */
+ def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
+ val inputMatrix = X.rdd.map(deserializeDoubleVector(_))
+ val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
+ serializeDoubleMatrix(to2dArray(result))
+ }
+
+ /**
+ * Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], method: String).
+ */
+ def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): Double = {
+ val xDeser = x.rdd.map(deserializeDouble(_))
+ val yDeser = y.rdd.map(deserializeDouble(_))
+ Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method))
+ }
+
+ // used by the corr methods to retrieve the name of the correlation method passed in via pyspark
+ private def getCorrNameOrDefault(method: String) = {
+ if (method == null) CorrelationNames.defaultCorrName else method
+ }
+
+ // Reformat a Matrix into Array[Array[Double]] for serialization
+ private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
+ val values = matrix.toArray
+ Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
+ }
+
// Used by the *RDD methods to get default seed if not passed in from pyspark
private def getSeedOrDefault(seed: java.lang.Long): Long = {
if (seed == null) Utils.random.nextLong else seed
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
index 9d6de9b6e1..f416a9fbb3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
@@ -23,21 +23,24 @@ import org.apache.spark.mllib.stat.correlation.Correlations
import org.apache.spark.rdd.RDD
/**
- * API for statistical functions in MLlib
+ * API for statistical functions in MLlib.
*/
@Experimental
object Statistics {
/**
+ * :: Experimental ::
* Compute the Pearson correlation matrix for the input RDD of Vectors.
* Columns with 0 covariance produce NaN entries in the correlation matrix.
*
* @param X an RDD[Vector] for which the correlation matrix is to be computed.
* @return Pearson correlation matrix comparing columns in X.
*/
+ @Experimental
def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
/**
+ * :: Experimental ::
* Compute the correlation matrix for the input RDD of Vectors using the specified method.
* Methods currently supported: `pearson` (default), `spearman`.
*
@@ -51,9 +54,11 @@ object Statistics {
* Supported: `pearson` (default), `spearman`
* @return Correlation matrix comparing columns in X.
*/
+ @Experimental
def corr(X: RDD[Vector], method: String): Matrix = Correlations.corrMatrix(X, method)
/**
+ * :: Experimental ::
* Compute the Pearson correlation for the input RDDs.
* Returns NaN if either vector has 0 variance.
*
@@ -64,9 +69,11 @@ object Statistics {
* @param y RDD[Double] of the same cardinality as x.
* @return A Double containing the Pearson correlation between the two input RDD[Double]s
*/
+ @Experimental
def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, y)
/**
+ * :: Experimental ::
* Compute the correlation for the input RDDs using the specified method.
* Methods currently supported: `pearson` (default), `spearman`.
*
@@ -80,5 +87,6 @@ object Statistics {
*@return A Double containing the correlation between the two input RDD[Double]s using the
* specified method.
*/
+ @Experimental
def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
index f23393d3da..1fb8d7b3d4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala
@@ -49,43 +49,48 @@ private[stat] trait Correlation {
}
/**
- * Delegates computation to the specific correlation object based on the input method name
- *
- * Currently supported correlations: pearson, spearman.
- * After new correlation algorithms are added, please update the documentation here and in
- * Statistics.scala for the correlation APIs.
- *
- * Maintains the default correlation type, pearson
+ * Delegates computation to the specific correlation object based on the input method name.
*/
private[stat] object Correlations {
- // Note: after new types of correlations are implemented, please update this map
- val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation))
- val defaultCorrName: String = "pearson"
- val defaultCorr: Correlation = nameToObjectMap(defaultCorrName)
-
- def corr(x: RDD[Double], y: RDD[Double], method: String = defaultCorrName): Double = {
+ def corr(x: RDD[Double],
+ y: RDD[Double],
+ method: String = CorrelationNames.defaultCorrName): Double = {
val correlation = getCorrelationFromName(method)
correlation.computeCorrelation(x, y)
}
- def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix = {
+ def corrMatrix(X: RDD[Vector],
+ method: String = CorrelationNames.defaultCorrName): Matrix = {
val correlation = getCorrelationFromName(method)
correlation.computeCorrelationMatrix(X)
}
- /**
- * Match input correlation name with a known name via simple string matching
- *
- * private to stat for ease of unit testing
- */
- private[stat] def getCorrelationFromName(method: String): Correlation = {
+ // Match input correlation name with a known name via simple string matching.
+ def getCorrelationFromName(method: String): Correlation = {
try {
- nameToObjectMap(method)
+ CorrelationNames.nameToObjectMap(method)
} catch {
case nse: NoSuchElementException =>
throw new IllegalArgumentException("Unrecognized method name. Supported correlations: "
- + nameToObjectMap.keys.mkString(", "))
+ + CorrelationNames.nameToObjectMap.keys.mkString(", "))
}
}
}
+
+/**
+ * Maintains supported and default correlation names.
+ *
+ * Currently supported correlations: `pearson`, `spearman`.
+ * Current default correlation: `pearson`.
+ *
+ * After new correlation algorithms are added, please update the documentation here and in
+ * Statistics.scala for the correlation APIs.
+ */
+private[mllib] object CorrelationNames {
+
+ // Note: after new types of correlations are implemented, please update this map.
+ val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation))
+ val defaultCorrName: String = "pearson"
+
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
index d94cfa2fce..bd413a80f5 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.api.python
import org.scalatest.FunSuite
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
class PythonMLLibAPISuite extends FunSuite {
@@ -59,10 +59,25 @@ class PythonMLLibAPISuite extends FunSuite {
}
test("double serialization") {
- for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue)) {
+ for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, Double.NaN)) {
val bytes = py.serializeDouble(x)
val deser = py.deserializeDouble(bytes)
- assert(x === deser)
+ // We use `equals` here for comparison because we cannot use `==` for NaN
+ assert(x.equals(deser))
}
}
+
+ test("matrix to 2D array") {
+ val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
+ val matrix = Matrices.dense(2, 3, values)
+ val arr = py.to2dArray(matrix)
+ val expected = Array(Array[Double](0, 3, 7), Array[Double](1.2, 4.56, 8))
+ assert(arr === expected)
+
+ // Test conversion for empty matrix
+ val empty = Array[Double]()
+ val emptyMatrix = Matrices.dense(0, 0, empty)
+ val empty2D = py.to2dArray(emptyMatrix)
+ assert(empty2D === Array[Array[Double]]())
+ }
}