diff options
author | Sean Owen <sowen@cloudera.com> | 2015-12-10 14:05:45 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-12-10 14:05:45 +0000 |
commit | 21b3d2a75f679b252e293000d706741dca33624a (patch) | |
tree | d7d204c4c546ea64ca8fdb201f1c8c55625df146 /mllib/src/main/scala/org | |
parent | e29704f90dfe67d9e276d242699ac0a00f64fb91 (diff) | |
download | spark-21b3d2a75f679b252e293000d706741dca33624a.tar.gz spark-21b3d2a75f679b252e293000d706741dca33624a.tar.bz2 spark-21b3d2a75f679b252e293000d706741dca33624a.zip |
[SPARK-11530][MLLIB] Return eigenvalues with PCA model
Add `computePrincipalComponentsAndVariance` to also compute PCA's explained variance.
CC mengxr
Author: Sean Owen <sowen@cloudera.com>
Closes #9736 from srowen/SPARK-11530.
Diffstat (limited to 'mllib/src/main/scala/org')
3 files changed, 49 insertions, 20 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index aa88cb03d2..53d33ea2b8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -73,7 +73,7 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v} val pca = new feature.PCA(k = $(k)) val pcaModel = pca.fit(input) - copyValues(new PCAModel(uid, pcaModel.pc).setParent(this)) + copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this)) } override def transformSchema(schema: StructType): StructType = { @@ -105,7 +105,8 @@ object PCA extends DefaultParamsReadable[PCA] { @Experimental class PCAModel private[ml] ( override val uid: String, - val pc: DenseMatrix) + val pc: DenseMatrix, + val explainedVariance: DenseVector) extends Model[PCAModel] with PCAParams with MLWritable { import PCAModel._ @@ -123,7 +124,7 @@ class PCAModel private[ml] ( */ override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) - val pcaModel = new feature.PCAModel($(k), pc) + val pcaModel = new feature.PCAModel($(k), pc, explainedVariance) val pcaOp = udf { pcaModel.transform _ } dataset.withColumn($(outputCol), pcaOp(col($(inputCol)))) } @@ -139,7 +140,7 @@ class PCAModel private[ml] ( } override def copy(extra: ParamMap): PCAModel = { - val copied = new PCAModel(uid, pc) + val copied = new PCAModel(uid, pc, explainedVariance) copyValues(copied, extra).setParent(parent) } @@ -152,11 +153,11 @@ object PCAModel extends MLReadable[PCAModel] { private[PCAModel] class PCAModelWriter(instance: PCAModel) extends MLWriter { - private case class Data(pc: DenseMatrix) + private case class Data(pc: DenseMatrix, explainedVariance: DenseVector) override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.pc) + val data = Data(instance.pc, instance.explainedVariance) val dataPath = new Path(path, "data").toString sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } @@ -169,10 +170,11 @@ object PCAModel extends MLReadable[PCAModel] { override def load(path: String): PCAModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val Row(pc: DenseMatrix) = sqlContext.read.parquet(dataPath) - .select("pc") + val Row(pc: DenseMatrix, explainedVariance: DenseVector) = + sqlContext.read.parquet(dataPath) + .select("pc", "explainedVariance") .head() - val model = new PCAModel(metadata.uid, pc) + val model = new PCAModel(metadata.uid, pc, explainedVariance) DefaultParamsReader.getAndSetParams(model, metadata) model } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala index ecb3c1e6c1..24e0a98c39 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.feature -import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.distributed.RowMatrix @@ -43,7 +43,8 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { s"source vector size is ${sources.first().size} must be greater than k=$k") val mat = new RowMatrix(sources) - val pc = mat.computePrincipalComponents(k) match { + val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k) + val densePC = pc match { case dm: DenseMatrix => dm case sm: SparseMatrix => @@ -58,7 +59,13 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}") } - new PCAModel(k, pc) + val denseExplainedVariance = explainedVariance match { + case dv: DenseVector => + dv + case sv: SparseVector => + sv.toDense + } + new PCAModel(k, densePC, denseExplainedVariance) } /** @@ -77,7 +84,8 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { @Since("1.4.0") class PCAModel private[spark] ( @Since("1.4.0") val k: Int, - @Since("1.4.0") val pc: DenseMatrix) extends VectorTransformer { + @Since("1.4.0") val pc: DenseMatrix, + @Since("1.6.0") val explainedVariance: DenseVector) extends VectorTransformer { /** * Transform a vector by computed Principal Components. * diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 52c0f19c64..2018a67868 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -368,7 +368,8 @@ class RowMatrix @Since("1.0.0") ( } /** - * Computes the top k principal components. + * Computes the top k principal components and a vector of proportions of + * variance explained by each principal component. * Rows correspond to observations and columns correspond to variables. * The principal components are stored a local matrix of size n-by-k. * Each column corresponds for one principal component, @@ -379,25 +380,43 @@ class RowMatrix @Since("1.0.0") ( * Note that this cannot be computed on matrices with more than 65535 columns. * * @param k number of top principal components. - * @return a matrix of size n-by-k, whose columns are principal components + * @return a matrix of size n-by-k, whose columns are principal components, and + * a vector of values which indicate how much variance each principal component + * explains */ - @Since("1.0.0") - def computePrincipalComponents(k: Int): Matrix = { + @Since("1.6.0") + def computePrincipalComponentsAndExplainedVariance(k: Int): (Matrix, Vector) = { val n = numCols().toInt require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]") val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]] - val brzSvd.SVD(u: BDM[Double], _, _) = brzSvd(Cov) + val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov) + + val eigenSum = s.data.sum + val explainedVariance = s.data.map(_ / eigenSum) if (k == n) { - Matrices.dense(n, k, u.data) + (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance)) } else { - Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)) + (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)), + Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k))) } } /** + * Computes the top k principal components only. + * + * @param k number of top principal components. + * @return a matrix of size n-by-k, whose columns are principal components + * @see computePrincipalComponentsAndExplainedVariance + */ + @Since("1.0.0") + def computePrincipalComponents(k: Int): Matrix = { + computePrincipalComponentsAndExplainedVariance(k)._1 + } + + /** * Computes column-wise summary statistics. */ @Since("1.0.0") |