diff options
author | Kirill A. Korinskiy <catap@catap.ru> | 2015-05-10 13:34:00 -0700 |
---|---|---|
committer | Joseph K. Bradley <joseph@databricks.com> | 2015-05-10 13:34:00 -0700 |
commit | 8c07c75c9831d6c34f69fe840edb6470d4dfdfef (patch) | |
tree | 620f0cd74c410512d4ee554112fc00cc8f117ec2 /mllib | |
parent | 3038443e58b9320c56f7785d9e36d4f85a563e6b (diff) | |
download | spark-8c07c75c9831d6c34f69fe840edb6470d4dfdfef.tar.gz spark-8c07c75c9831d6c34f69fe840edb6470d4dfdfef.tar.bz2 spark-8c07c75c9831d6c34f69fe840edb6470d4dfdfef.zip |
[SPARK-5521] PCA wrapper for easy transform vectors
I implement a simple PCA wrapper for easy transform of vectors by PCA for example LabeledPoint or another complicated structure.
Example of usage:
```
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.feature.PCA
val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}.cache()
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
val pca = PCA.create(training.first().features.size/2, data.map(_.features))
val training_pca = training.map(p => p.copy(features = pca.transform(p.features)))
val test_pca = test.map(p => p.copy(features = pca.transform(p.features)))
val numIterations = 100
val model = LinearRegressionWithSGD.train(training, numIterations)
val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations)
val valuesAndPreds = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
val valuesAndPreds_pca = test_pca.map { point =>
val score = model_pca.predict(point.features)
(score, point.label)
}
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean()
println("Mean Squared Error = " + MSE)
println("PCA Mean Squared Error = " + MSE_pca)
```
Author: Kirill A. Korinskiy <catap@catap.ru>
Author: Joseph K. Bradley <joseph@databricks.com>
Closes #4304 from catap/pca and squashes the following commits:
501bcd9 [Joseph K. Bradley] Small updates: removed k from Java-friendly PCA fit(). In PCASuite, converted results to set for comparison. Added an error message for bad k in PCA.
9dcc02b [Kirill A. Korinskiy] [SPARK-5521] fix scala style
1892a06 [Kirill A. Korinskiy] [SPARK-5521] PCA wrapper for easy transform vectors
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala | 93 | ||||
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala | 48 |
2 files changed, 141 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala new file mode 100644 index 0000000000..4e01e402b4 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.rdd.RDD + +/** + * A feature transformer that projects vectors to a low-dimensional space using PCA. + * + * @param k number of principal components + */ +class PCA(val k: Int) { + require(k >= 1, s"PCA requires a number of principal components k >= 1 but was given $k") + + /** + * Computes a [[PCAModel]] that contains the principal components of the input vectors. + * + * @param sources source vectors + */ + def fit(sources: RDD[Vector]): PCAModel = { + require(k <= sources.first().size, + s"source vector size is ${sources.first().size} must be greater than k=$k") + + val mat = new RowMatrix(sources) + val pc = mat.computePrincipalComponents(k) match { + case dm: DenseMatrix => + dm + case sm: SparseMatrix => + /* Convert a sparse matrix to dense. + * + * RowMatrix.computePrincipalComponents always returns a dense matrix. + * The following code is a safeguard. + */ + sm.toDense + case m => + throw new IllegalArgumentException("Unsupported matrix format. Expected " + + s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}") + + } + new PCAModel(k, pc) + } + + /** Java-friendly version of [[fit()]] */ + def fit(sources: JavaRDD[Vector]): PCAModel = fit(sources.rdd) +} + +/** + * Model fitted by [[PCA]] that can project vectors to a low-dimensional space using PCA. + * + * @param k number of principal components. + * @param pc a principal components Matrix. Each column is one principal component. + */ +class PCAModel private[mllib] (val k: Int, val pc: DenseMatrix) extends VectorTransformer { + /** + * Transform a vector by computed Principal Components. + * + * @param vector vector to be transformed. + * Vector must be the same length as the source vectors given to [[PCA.fit()]]. + * @return transformed vector. Vector will be of length k. + */ + override def transform(vector: Vector): Vector = { + vector match { + case dv: DenseVector => + pc.transpose.multiply(dv) + case SparseVector(size, indices, values) => + /* SparseVector -> single row SparseMatrix */ + val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose + val projection = sm.multiply(pc) + Vectors.dense(projection.values) + case _ => + throw new IllegalArgumentException("Unsupported vector format. Expected " + + s"SparseVector or DenseVector. Instead got: ${vector.getClass}") + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala new file mode 100644 index 0000000000..758af588f1 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class PCASuite extends FunSuite with MLlibTestSparkContext { + + private val data = Array( + Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))), + Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), + Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) + ) + + private lazy val dataRDD = sc.parallelize(data, 2) + + test("Correct computing use a PCA wrapper") { + val k = dataRDD.count().toInt + val pca = new PCA(k).fit(dataRDD) + + val mat = new RowMatrix(dataRDD) + val pc = mat.computePrincipalComponents(k) + + val pca_transform = pca.transform(dataRDD).collect() + val mat_multiply = mat.multiply(pc).rows.collect() + + assert(pca_transform.toSet === mat_multiply.toSet) + } +} |