aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-12-10 14:05:45 +0000
committerSean Owen <sowen@cloudera.com>2015-12-10 14:05:45 +0000
commit21b3d2a75f679b252e293000d706741dca33624a (patch)
treed7d204c4c546ea64ca8fdb201f1c8c55625df146 /mllib
parente29704f90dfe67d9e276d242699ac0a00f64fb91 (diff)
downloadspark-21b3d2a75f679b252e293000d706741dca33624a.tar.gz
spark-21b3d2a75f679b252e293000d706741dca33624a.tar.bz2
spark-21b3d2a75f679b252e293000d706741dca33624a.zip
[SPARK-11530][MLLIB] Return eigenvalues with PCA model
Add `computePrincipalComponentsAndVariance` to also compute PCA's explained variance. CC mengxr Author: Sean Owen <sowen@cloudera.com> Closes #9736 from srowen/SPARK-11530.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala20
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala16
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala33
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala3
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala10
6 files changed, 64 insertions, 25 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
index aa88cb03d2..53d33ea2b8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -73,7 +73,7 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams
val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v}
val pca = new feature.PCA(k = $(k))
val pcaModel = pca.fit(input)
- copyValues(new PCAModel(uid, pcaModel.pc).setParent(this))
+ copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this))
}
override def transformSchema(schema: StructType): StructType = {
@@ -105,7 +105,8 @@ object PCA extends DefaultParamsReadable[PCA] {
@Experimental
class PCAModel private[ml] (
override val uid: String,
- val pc: DenseMatrix)
+ val pc: DenseMatrix,
+ val explainedVariance: DenseVector)
extends Model[PCAModel] with PCAParams with MLWritable {
import PCAModel._
@@ -123,7 +124,7 @@ class PCAModel private[ml] (
*/
override def transform(dataset: DataFrame): DataFrame = {
transformSchema(dataset.schema, logging = true)
- val pcaModel = new feature.PCAModel($(k), pc)
+ val pcaModel = new feature.PCAModel($(k), pc, explainedVariance)
val pcaOp = udf { pcaModel.transform _ }
dataset.withColumn($(outputCol), pcaOp(col($(inputCol))))
}
@@ -139,7 +140,7 @@ class PCAModel private[ml] (
}
override def copy(extra: ParamMap): PCAModel = {
- val copied = new PCAModel(uid, pc)
+ val copied = new PCAModel(uid, pc, explainedVariance)
copyValues(copied, extra).setParent(parent)
}
@@ -152,11 +153,11 @@ object PCAModel extends MLReadable[PCAModel] {
private[PCAModel] class PCAModelWriter(instance: PCAModel) extends MLWriter {
- private case class Data(pc: DenseMatrix)
+ private case class Data(pc: DenseMatrix, explainedVariance: DenseVector)
override protected def saveImpl(path: String): Unit = {
DefaultParamsWriter.saveMetadata(instance, path, sc)
- val data = Data(instance.pc)
+ val data = Data(instance.pc, instance.explainedVariance)
val dataPath = new Path(path, "data").toString
sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
@@ -169,10 +170,11 @@ object PCAModel extends MLReadable[PCAModel] {
override def load(path: String): PCAModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val dataPath = new Path(path, "data").toString
- val Row(pc: DenseMatrix) = sqlContext.read.parquet(dataPath)
- .select("pc")
+ val Row(pc: DenseMatrix, explainedVariance: DenseVector) =
+ sqlContext.read.parquet(dataPath)
+ .select("pc", "explainedVariance")
.head()
- val model = new PCAModel(metadata.uid, pc)
+ val model = new PCAModel(metadata.uid, pc, explainedVariance)
DefaultParamsReader.getAndSetParams(model, metadata)
model
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
index ecb3c1e6c1..24e0a98c39 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
@@ -17,7 +17,7 @@
package org.apache.spark.mllib.feature
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.RowMatrix
@@ -43,7 +43,8 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
s"source vector size is ${sources.first().size} must be greater than k=$k")
val mat = new RowMatrix(sources)
- val pc = mat.computePrincipalComponents(k) match {
+ val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
+ val densePC = pc match {
case dm: DenseMatrix =>
dm
case sm: SparseMatrix =>
@@ -58,7 +59,13 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}")
}
- new PCAModel(k, pc)
+ val denseExplainedVariance = explainedVariance match {
+ case dv: DenseVector =>
+ dv
+ case sv: SparseVector =>
+ sv.toDense
+ }
+ new PCAModel(k, densePC, denseExplainedVariance)
}
/**
@@ -77,7 +84,8 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
@Since("1.4.0")
class PCAModel private[spark] (
@Since("1.4.0") val k: Int,
- @Since("1.4.0") val pc: DenseMatrix) extends VectorTransformer {
+ @Since("1.4.0") val pc: DenseMatrix,
+ @Since("1.6.0") val explainedVariance: DenseVector) extends VectorTransformer {
/**
* Transform a vector by computed Principal Components.
*
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 52c0f19c64..2018a67868 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -368,7 +368,8 @@ class RowMatrix @Since("1.0.0") (
}
/**
- * Computes the top k principal components.
+ * Computes the top k principal components and a vector of proportions of
+ * variance explained by each principal component.
* Rows correspond to observations and columns correspond to variables.
* The principal components are stored a local matrix of size n-by-k.
* Each column corresponds for one principal component,
@@ -379,25 +380,43 @@ class RowMatrix @Since("1.0.0") (
* Note that this cannot be computed on matrices with more than 65535 columns.
*
* @param k number of top principal components.
- * @return a matrix of size n-by-k, whose columns are principal components
+ * @return a matrix of size n-by-k, whose columns are principal components, and
+ * a vector of values which indicate how much variance each principal component
+ * explains
*/
- @Since("1.0.0")
- def computePrincipalComponents(k: Int): Matrix = {
+ @Since("1.6.0")
+ def computePrincipalComponentsAndExplainedVariance(k: Int): (Matrix, Vector) = {
val n = numCols().toInt
require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")
val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]]
- val brzSvd.SVD(u: BDM[Double], _, _) = brzSvd(Cov)
+ val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov)
+
+ val eigenSum = s.data.sum
+ val explainedVariance = s.data.map(_ / eigenSum)
if (k == n) {
- Matrices.dense(n, k, u.data)
+ (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance))
} else {
- Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k))
+ (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)),
+ Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k)))
}
}
/**
+ * Computes the top k principal components only.
+ *
+ * @param k number of top principal components.
+ * @return a matrix of size n-by-k, whose columns are principal components
+ * @see computePrincipalComponentsAndExplainedVariance
+ */
+ @Since("1.0.0")
+ def computePrincipalComponents(k: Int): Matrix = {
+ computePrincipalComponentsAndExplainedVariance(k)._1
+ }
+
+ /**
* Computes column-wise summary statistics.
*/
@Since("1.0.0")
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
index edab21e6c3..9f6618b929 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
-import org.apache.spark.mllib.feature.{PCAModel => OldPCAModel}
import org.apache.spark.sql.Row
class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
@@ -32,7 +31,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
test("params") {
ParamsSuite.checkParams(new PCA)
val mat = Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix]
- val model = new PCAModel("pca", mat)
+ val explainedVariance = Vectors.dense(0.5, 0.5).asInstanceOf[DenseVector]
+ val model = new PCAModel("pca", mat, explainedVariance)
ParamsSuite.checkParams(model)
}
@@ -76,7 +76,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
test("PCAModel read/write") {
val instance = new PCAModel("myPCAModel",
- Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix])
+ Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix],
+ Vectors.dense(0.5, 0.5).asInstanceOf[DenseVector])
val newInstance = testDefaultReadWrite(instance)
assert(newInstance.pc === instance.pc)
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
index e57f491913..a8d82932d3 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
@@ -37,11 +37,12 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
val pca = new PCA(k).fit(dataRDD)
val mat = new RowMatrix(dataRDD)
- val pc = mat.computePrincipalComponents(k)
+ val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
val pca_transform = pca.transform(dataRDD).collect()
val mat_multiply = mat.multiply(pc).rows.collect()
assert(pca_transform.toSet === mat_multiply.toSet)
+ assert(pca.explainedVariance === explainedVariance)
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index 4abb98fb6f..0ff901ddc4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.mllib.linalg.distributed
+import java.util.Arrays
+
import scala.util.Random
import breeze.numerics.abs
@@ -49,6 +51,7 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
(0.0, 1.0, 0.0),
(math.sqrt(2.0) / 2.0, 0.0, math.sqrt(2.0) / 2.0),
(math.sqrt(2.0) / 2.0, 0.0, - math.sqrt(2.0) / 2.0))
+ val explainedVariance = BDV(4.0 / 7.0, 3.0 / 7.0, 0.0)
var denseMat: RowMatrix = _
var sparseMat: RowMatrix = _
@@ -201,10 +204,15 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
test("pca") {
for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) {
- val pc = denseMat.computePrincipalComponents(k)
+ val (pc, expVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
assert(pc.numRows === n)
assert(pc.numCols === k)
assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k)
+ assert(
+ closeToZero(BDV(expVariance.toArray) -
+ BDV(Arrays.copyOfRange(explainedVariance.data, 0, k))))
+ // Check that this method returns the same answer
+ assert(pc === mat.computePrincipalComponents(k))
}
}