aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorKirill A. Korinskiy <catap@catap.ru>2015-05-10 13:34:00 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-05-10 13:34:00 -0700
commit8c07c75c9831d6c34f69fe840edb6470d4dfdfef (patch)
tree620f0cd74c410512d4ee554112fc00cc8f117ec2 /mllib
parent3038443e58b9320c56f7785d9e36d4f85a563e6b (diff)
downloadspark-8c07c75c9831d6c34f69fe840edb6470d4dfdfef.tar.gz
spark-8c07c75c9831d6c34f69fe840edb6470d4dfdfef.tar.bz2
spark-8c07c75c9831d6c34f69fe840edb6470d4dfdfef.zip
[SPARK-5521] PCA wrapper for easy transform vectors
I implement a simple PCA wrapper for easy transform of vectors by PCA for example LabeledPoint or another complicated structure. Example of usage: ``` import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.feature.PCA val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) val pca = PCA.create(training.first().features.size/2, data.map(_.features)) val training_pca = training.map(p => p.copy(features = pca.transform(p.features))) val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) val numIterations = 100 val model = LinearRegressionWithSGD.train(training, numIterations) val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations) val valuesAndPreds = test.map { point => val score = model.predict(point.features) (score, point.label) } val valuesAndPreds_pca = test_pca.map { point => val score = model_pca.predict(point.features) (score, point.label) } val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean() println("Mean Squared Error = " + MSE) println("PCA Mean Squared Error = " + MSE_pca) ``` Author: Kirill A. Korinskiy <catap@catap.ru> Author: Joseph K. Bradley <joseph@databricks.com> Closes #4304 from catap/pca and squashes the following commits: 501bcd9 [Joseph K. Bradley] Small updates: removed k from Java-friendly PCA fit(). In PCASuite, converted results to set for comparison. Added an error message for bad k in PCA. 9dcc02b [Kirill A. Korinskiy] [SPARK-5521] fix scala style 1892a06 [Kirill A. Korinskiy] [SPARK-5521] PCA wrapper for easy transform vectors
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala93
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala48
2 files changed, 141 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
new file mode 100644
index 0000000000..4e01e402b4
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.feature
+
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * A feature transformer that projects vectors to a low-dimensional space using PCA.
+ *
+ * @param k number of principal components
+ */
+class PCA(val k: Int) {
+ require(k >= 1, s"PCA requires a number of principal components k >= 1 but was given $k")
+
+ /**
+ * Computes a [[PCAModel]] that contains the principal components of the input vectors.
+ *
+ * @param sources source vectors
+ */
+ def fit(sources: RDD[Vector]): PCAModel = {
+ require(k <= sources.first().size,
+ s"source vector size is ${sources.first().size} must be greater than k=$k")
+
+ val mat = new RowMatrix(sources)
+ val pc = mat.computePrincipalComponents(k) match {
+ case dm: DenseMatrix =>
+ dm
+ case sm: SparseMatrix =>
+ /* Convert a sparse matrix to dense.
+ *
+ * RowMatrix.computePrincipalComponents always returns a dense matrix.
+ * The following code is a safeguard.
+ */
+ sm.toDense
+ case m =>
+ throw new IllegalArgumentException("Unsupported matrix format. Expected " +
+ s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}")
+
+ }
+ new PCAModel(k, pc)
+ }
+
+ /** Java-friendly version of [[fit()]] */
+ def fit(sources: JavaRDD[Vector]): PCAModel = fit(sources.rdd)
+}
+
+/**
+ * Model fitted by [[PCA]] that can project vectors to a low-dimensional space using PCA.
+ *
+ * @param k number of principal components.
+ * @param pc a principal components Matrix. Each column is one principal component.
+ */
+class PCAModel private[mllib] (val k: Int, val pc: DenseMatrix) extends VectorTransformer {
+ /**
+ * Transform a vector by computed Principal Components.
+ *
+ * @param vector vector to be transformed.
+ * Vector must be the same length as the source vectors given to [[PCA.fit()]].
+ * @return transformed vector. Vector will be of length k.
+ */
+ override def transform(vector: Vector): Vector = {
+ vector match {
+ case dv: DenseVector =>
+ pc.transpose.multiply(dv)
+ case SparseVector(size, indices, values) =>
+ /* SparseVector -> single row SparseMatrix */
+ val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose
+ val projection = sm.multiply(pc)
+ Vectors.dense(projection.values)
+ case _ =>
+ throw new IllegalArgumentException("Unsupported vector format. Expected " +
+ s"SparseVector or DenseVector. Instead got: ${vector.getClass}")
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
new file mode 100644
index 0000000000..758af588f1
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.feature
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+
+class PCASuite extends FunSuite with MLlibTestSparkContext {
+
+ private val data = Array(
+ Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
+ Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
+ Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
+ )
+
+ private lazy val dataRDD = sc.parallelize(data, 2)
+
+ test("Correct computing use a PCA wrapper") {
+ val k = dataRDD.count().toInt
+ val pca = new PCA(k).fit(dataRDD)
+
+ val mat = new RowMatrix(dataRDD)
+ val pc = mat.computePrincipalComponents(k)
+
+ val pca_transform = pca.transform(dataRDD).collect()
+ val mat_multiply = mat.multiply(pc).rows.collect()
+
+ assert(pca_transform.toSet === mat_multiply.toSet)
+ }
+}