aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/mllib-guide.md1
-rw-r--r--docs/mllib-linear-algebra.md13
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala51
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala26
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala120
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala370
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala30
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala31
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala65
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala124
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala98
12 files changed, 819 insertions, 114 deletions
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index 76308ec9c0..203d235bf9 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -29,6 +29,7 @@ The following links provide a detailed explanation of the methods and usage exam
* Gradient Descent and Stochastic Gradient Descent
* <a href="mllib-linear-algebra.html">Linear Algebra</a>
* Singular Value Decomposition
+ * Principal Component Analysis
# Dependencies
MLlib uses the [jblas](https://github.com/mikiobraun/jblas) linear algebra library, which itself
diff --git a/docs/mllib-linear-algebra.md b/docs/mllib-linear-algebra.md
index cc203d833d..09598be790 100644
--- a/docs/mllib-linear-algebra.md
+++ b/docs/mllib-linear-algebra.md
@@ -59,3 +59,16 @@ val = decomposed.S.data
println("singular values = " + s.toArray.mkString)
{% endhighlight %}
+
+
+# Principal Component Analysis
+
+Computes the top k principal component coefficients for the m-by-n data matrix X.
+Rows of X correspond to observations and columns correspond to variables.
+The coefficient matrix is n-by-k. Each column of the return matrix contains coefficients
+for one principal component, and the columns are in descending
+order of component variance. This function centers the data and uses the
+singular value decomposition (SVD) algorithm.
+
+All input and output is expected in DenseMatrix matrix format. See the examples directory
+under "SparkPCA.scala" for example usage.
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala
new file mode 100644
index 0000000000..d4e08c5e12
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.PCA
+import org.apache.spark.mllib.linalg.MatrixEntry
+import org.apache.spark.mllib.linalg.SparseMatrix
+import org.apache.spark.mllib.util._
+
+
+/**
+ * Compute PCA of an example matrix.
+ */
+object SparkPCA {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: SparkPCA <master> m n")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "PCA",
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
+
+ val m = args(2).toInt
+ val n = args(3).toInt
+
+ // Make example matrix
+ val data = Array.tabulate(m, n) { (a, b) =>
+ (a + 2).toDouble * (b + 1) / (1 + a + b) }
+
+ // recover top principal component
+ val coeffs = new PCA().setK(1).compute(sc.makeRDD(data))
+
+ println("top principal component = " + coeffs.mkString(", "))
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
index ce2b133368..2933cec497 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
@@ -38,7 +38,7 @@ object SparkSVD {
System.exit(1)
}
val sc = new SparkContext(args(0), "SVD",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
// Load and parse the data file
val data = sc.textFile(args(1)).map { line =>
@@ -49,7 +49,7 @@ object SparkSVD {
val n = args(3).toInt
// recover largest singular vector
- val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)
+ val decomposed = new SVD().setK(1).compute(SparseMatrix(data, m, n))
val u = decomposed.U.data
val s = decomposed.S.data
val v = decomposed.V.data
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala
new file mode 100644
index 0000000000..2608a67bfe
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+/**
+ * Class that represents a row of a dense matrix
+ *
+ * @param i row index (0 indexing used)
+ * @param data entries of the row
+ */
+case class MatrixRow(val i: Int, val data: Array[Double])
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
new file mode 100644
index 0000000000..fe5b3f6c7e
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.rdd.RDD
+
+
+import org.jblas.DoubleMatrix
+
+
+/**
+ * Class used to obtain principal components
+ */
+class PCA {
+ private var k = 1
+
+ /**
+ * Set the number of top-k principle components to return
+ */
+ def setK(k: Int): PCA = {
+ this.k = k
+ this
+ }
+
+ /**
+ * Compute PCA using the current set parameters
+ */
+ def compute(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = {
+ computePCA(matrix)
+ }
+
+ /**
+ * Compute PCA using the parameters currently set
+ * See computePCA() for more details
+ */
+ def compute(matrix: RDD[Array[Double]]): Array[Array[Double]] = {
+ computePCA(matrix)
+ }
+
+ /**
+ * Computes the top k principal component coefficients for the m-by-n data matrix X.
+ * Rows of X correspond to observations and columns correspond to variables.
+ * The coefficient matrix is n-by-k. Each column of coeff contains coefficients
+ * for one principal component, and the columns are in descending
+ * order of component variance.
+ * This function centers the data and uses the
+ * singular value decomposition (SVD) algorithm.
+ *
+ * @param matrix dense matrix to perform PCA on
+ * @return An nxk matrix with principal components in columns. Columns are inner arrays
+ */
+ private def computePCA(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = {
+ val m = matrix.m
+ val n = matrix.n
+
+ if (m <= 0 || n <= 0) {
+ throw new IllegalArgumentException("Expecting a well-formed matrix: m=$m n=$n")
+ }
+
+ computePCA(matrix.rows.map(_.data))
+ }
+
+ /**
+ * Computes the top k principal component coefficients for the m-by-n data matrix X.
+ * Rows of X correspond to observations and columns correspond to variables.
+ * The coefficient matrix is n-by-k. Each column of coeff contains coefficients
+ * for one principal component, and the columns are in descending
+ * order of component variance.
+ * This function centers the data and uses the
+ * singular value decomposition (SVD) algorithm.
+ *
+ * @param matrix dense matrix to perform pca on
+ * @return An nxk matrix of principal components
+ */
+ private def computePCA(matrix: RDD[Array[Double]]): Array[Array[Double]] = {
+ val n = matrix.first.size
+
+ // compute column sums and normalize matrix
+ val colSumsTemp = matrix.map((_, 1)).fold((Array.ofDim[Double](n), 0)) {
+ (a, b) =>
+ val am = new DoubleMatrix(a._1)
+ val bm = new DoubleMatrix(b._1)
+ am.addi(bm)
+ (a._1, a._2 + b._2)
+ }
+
+ val m = colSumsTemp._2
+ val colSums = colSumsTemp._1.map(x => x / m)
+
+ val data = matrix.map {
+ x =>
+ val row = Array.ofDim[Double](n)
+ var i = 0
+ while (i < n) {
+ row(i) = x(i) - colSums(i)
+ i += 1
+ }
+ row
+ }
+
+ val (u, s, v) = new SVD().setK(k).compute(data)
+ v
+ }
+}
+
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
index e4a26eeb07..3e7cc648d1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -23,12 +23,16 @@ import org.apache.spark.rdd.RDD
import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
-
/**
* Class used to obtain singular value decompositions
*/
class SVD {
- private var k: Int = 1
+ private var k = 1
+ private var computeU = true
+
+ // All singular values smaller than rCond * sigma(0)
+ // are treated as zero, where sigma(0) is the largest singular value.
+ private var rCond = 1e-9
/**
* Set the number of top-k singular vectors to return
@@ -38,54 +42,235 @@ class SVD {
this
}
- /**
+ /**
+ * Sets the reciprocal condition number (rCond). All singular values
+ * smaller than rCond * sigma(0) are treated as zero,
+ * where sigma(0) is the largest singular value.
+ */
+ def setReciprocalConditionNumber(smallS: Double): SVD = {
+ this.rCond = smallS
+ this
+ }
+
+ /**
+ * Should U be computed?
+ */
+ def setComputeU(compU: Boolean): SVD = {
+ this.computeU = compU
+ this
+ }
+
+ /**
* Compute SVD using the current set parameters
*/
- def compute(matrix: SparseMatrix) : MatrixSVD = {
- SVD.sparseSVD(matrix, k)
+ def compute(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = {
+ denseSVD(matrix)
}
-}
+ /**
+ * Compute SVD using the current set parameters
+ * Returns (U, S, V) such that A = USV^T
+ * U is a row-by-row dense matrix
+ * S is a simple double array of singular values
+ * V is a 2d array matrix
+ * See [[denseSVD]] for more documentation
+ */
+ def compute(matrix: RDD[Array[Double]]):
+ (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
+ denseSVD(matrix)
+ }
-/**
- * Top-level methods for calling Singular Value Decomposition
- * NOTE: All matrices are in 0-indexed sparse format RDD[((int, int), value)]
- */
-object SVD {
-/**
- * Singular Value Decomposition for Tall and Skinny matrices.
- * Given an m x n matrix A, this will compute matrices U, S, V such that
- * A = U * S * V'
- *
- * There is no restriction on m, but we require n^2 doubles to fit in memory.
- * Further, n should be less than m.
- *
- * The decomposition is computed by first computing A'A = V S^2 V',
- * computing svd locally on that (since n x n is small),
- * from which we recover S and V.
- * Then we compute U via easy matrix multiplication
- * as U = A * V * S^-1
- *
- * Only the k largest singular values and associated vectors are found.
- * If there are k such values, then the dimensions of the return will be:
- *
- * S is k x k and diagonal, holding the singular values on diagonal
- * U is m x k and satisfies U'U = eye(k)
- * V is n x k and satisfies V'V = eye(k)
- *
- * All input and output is expected in sparse matrix format, 0-indexed
- * as tuples of the form ((i,j),value) all in RDDs using the
- * SparseMatrix class
- *
- * @param matrix sparse matrix to factorize
- * @param k Recover k singular values and vectors
- * @return Three sparse matrices: U, S, V such that A = USV^T
- */
- def sparseSVD(
- matrix: SparseMatrix,
- k: Int)
- : MatrixSVD =
- {
+ /**
+ * See full paramter definition of sparseSVD for more description.
+ *
+ * @param matrix sparse matrix to factorize
+ * @return Three sparse matrices: U, S, V such that A = USV^T
+ */
+ def compute(matrix: SparseMatrix): MatrixSVD = {
+ sparseSVD(matrix)
+ }
+
+ /**
+ * Singular Value Decomposition for Tall and Skinny matrices.
+ * Given an m x n matrix A, this will compute matrices U, S, V such that
+ * A = U * S * V'
+ *
+ * There is no restriction on m, but we require n^2 doubles to fit in memory.
+ * Further, n should be less than m.
+ *
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small),
+ * from which we recover S and V.
+ * Then we compute U via easy matrix multiplication
+ * as U = A * V * S^-1
+ *
+ * Only the k largest singular values and associated vectors are found.
+ * If there are k such values, then the dimensions of the return will be:
+ *
+ * S is k x k and diagonal, holding the singular values on diagonal
+ * U is m x k and satisfies U'U = eye(k)
+ * V is n x k and satisfies V'V = eye(k)
+ *
+ * @param matrix dense matrix to factorize
+ * @return See [[TallSkinnyMatrixSVD]] for the output matrices and arrays
+ */
+ private def denseSVD(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = {
+ val m = matrix.m
+ val n = matrix.n
+
+ if (m < n || m <= 0 || n <= 0) {
+ throw new IllegalArgumentException("Expecting a tall and skinny matrix m=$m n=$n")
+ }
+
+ if (k < 1 || k > n) {
+ throw new IllegalArgumentException("Request up to n singular values n=$n k=$k")
+ }
+
+ val rowIndices = matrix.rows.map(_.i)
+
+ // compute SVD
+ val (u, sigma, v) = denseSVD(matrix.rows.map(_.data))
+
+ if (computeU) {
+ // prep u for returning
+ val retU = TallSkinnyDenseMatrix(
+ u.zip(rowIndices).map {
+ case (row, i) => MatrixRow(i, row)
+ },
+ m,
+ k)
+
+ TallSkinnyMatrixSVD(retU, sigma, v)
+ } else {
+ TallSkinnyMatrixSVD(null, sigma, v)
+ }
+ }
+
+ /**
+ * Singular Value Decomposition for Tall and Skinny matrices.
+ * Given an m x n matrix A, this will compute matrices U, S, V such that
+ * A = U * S * V'
+ *
+ * There is no restriction on m, but we require n^2 doubles to fit in memory.
+ * Further, n should be less than m.
+ *
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small),
+ * from which we recover S and V.
+ * Then we compute U via easy matrix multiplication
+ * as U = A * V * S^-1
+ *
+ * Only the k largest singular values and associated vectors are found.
+ * If there are k such values, then the dimensions of the return will be:
+ *
+ * S is k x k and diagonal, holding the singular values on diagonal
+ * U is m x k and satisfies U'U = eye(k)
+ * V is n x k and satisfies V'V = eye(k)
+ *
+ * The return values are as lean as possible: an RDD of rows for U,
+ * a simple array for sigma, and a dense 2d matrix array for V
+ *
+ * @param matrix dense matrix to factorize
+ * @return Three matrices: U, S, V such that A = USV^T
+ */
+ private def denseSVD(matrix: RDD[Array[Double]]):
+ (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
+ val n = matrix.first.size
+
+ if (k < 1 || k > n) {
+ throw new IllegalArgumentException(
+ "Request up to n singular values k=$k n=$n")
+ }
+
+ // Compute A^T A
+ val fullata = matrix.mapPartitions {
+ iter =>
+ val localATA = Array.ofDim[Double](n, n)
+ while (iter.hasNext) {
+ val row = iter.next()
+ var i = 0
+ while (i < n) {
+ var j = 0
+ while (j < n) {
+ localATA(i)(j) += row(i) * row(j)
+ j += 1
+ }
+ i += 1
+ }
+ }
+ Iterator(localATA)
+ }.fold(Array.ofDim[Double](n, n)) {
+ (a, b) =>
+ var i = 0
+ while (i < n) {
+ var j = 0
+ while (j < n) {
+ a(i)(j) += b(i)(j)
+ j += 1
+ }
+ i += 1
+ }
+ a
+ }
+
+ // Construct jblas A^T A locally
+ val ata = new DoubleMatrix(fullata)
+
+ // Since A^T A is small, we can compute its SVD directly
+ val svd = Singular.sparseSVD(ata)
+ val V = svd(0)
+ val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x / svd(1).get(0) > rCond)
+
+ val sk = Math.min(k, sigmas.size)
+ val sigma = sigmas.take(sk)
+
+ // prepare V for returning
+ val retV = Array.tabulate(n, sk)((i, j) => V.get(i, j))
+
+ if (computeU) {
+ // Compute U as U = A V S^-1
+ // Compute VS^-1
+ val vsinv = new DoubleMatrix(Array.tabulate(n, sk)((i, j) => V.get(i, j) / sigma(j)))
+ val retU = matrix.map {
+ x =>
+ val v = new DoubleMatrix(Array(x))
+ v.mmul(vsinv).data
+ }
+ (retU, sigma, retV)
+ } else {
+ (null, sigma, retV)
+ }
+ }
+
+ /**
+ * Singular Value Decomposition for Tall and Skinny sparse matrices.
+ * Given an m x n matrix A, this will compute matrices U, S, V such that
+ * A = U * S * V'
+ *
+ * There is no restriction on m, but we require O(n^2) doubles to fit in memory.
+ * Further, n should be less than m.
+ *
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small),
+ * from which we recover S and V.
+ * Then we compute U via easy matrix multiplication
+ * as U = A * V * S^-1
+ *
+ * Only the k largest singular values and associated vectors are found.
+ * If there are k such values, then the dimensions of the return will be:
+ *
+ * S is k x k and diagonal, holding the singular values on diagonal
+ * U is m x k and satisfies U'U = eye(k)
+ * V is n x k and satisfies V'V = eye(k)
+ *
+ * All input and output is expected in sparse matrix format, 0-indexed
+ * as tuples of the form ((i,j),value) all in RDDs using the
+ * SparseMatrix class
+ *
+ * @param matrix sparse matrix to factorize
+ * @return Three sparse matrices: U, S, V such that A = USV^T
+ */
+ private def sparseSVD(matrix: SparseMatrix): MatrixSVD = {
val data = matrix.data
val m = matrix.m
val n = matrix.n
@@ -100,11 +285,16 @@ object SVD {
// Compute A^T A, assuming rows are sparse enough to fit in memory
val rows = data.map(entry =>
- (entry.i, (entry.j, entry.mval))).groupByKey()
- val emits = rows.flatMap{ case (rowind, cols) =>
- cols.flatMap{ case (colind1, mval1) =>
- cols.map{ case (colind2, mval2) =>
- ((colind1, colind2), mval1*mval2) } }
+ (entry.i, (entry.j, entry.mval))).groupByKey()
+ val emits = rows.flatMap {
+ case (rowind, cols) =>
+ cols.flatMap {
+ case (colind1, mval1) =>
+ cols.map {
+ case (colind2, mval2) =>
+ ((colind1, colind2), mval1 * mval2)
+ }
+ }
}.reduceByKey(_ + _)
// Construct jblas A^T A locally
@@ -116,11 +306,12 @@ object SVD {
// Since A^T A is small, we can compute its SVD directly
val svd = Singular.sparseSVD(ata)
val V = svd(0)
+ // This will be updated to rcond
val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9)
if (sigmas.size < k) {
- throw new Exception("Not enough singular values to return")
- }
+ throw new Exception("Not enough singular values to return k=" + k + " s=" + sigmas.size)
+ }
val sigma = sigmas.take(k)
@@ -128,56 +319,73 @@ object SVD {
// prepare V for returning
val retVdata = sc.makeRDD(
- Array.tabulate(V.rows, sigma.length){ (i,j) =>
- MatrixEntry(i, j, V.get(i,j)) }.flatten)
+ Array.tabulate(V.rows, sigma.length) {
+ (i, j) =>
+ MatrixEntry(i, j, V.get(i, j))
+ }.flatten)
val retV = SparseMatrix(retVdata, V.rows, sigma.length)
-
- val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
- x => MatrixEntry(x, x, sigma(x))})
+
+ val retSdata = sc.makeRDD(Array.tabulate(sigma.length) {
+ x => MatrixEntry(x, x, sigma(x))
+ })
val retS = SparseMatrix(retSdata, sigma.length, sigma.length)
// Compute U as U = A V S^-1
// turn V S^-1 into an RDD as a sparse matrix
- val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
- { (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten)
-
- // Multiply A by VS^-1
- val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
- val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
- val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)))
- => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_ + _)
- .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}
- val retU = SparseMatrix(retUdata, m, sigma.length)
-
- MatrixSVD(retU, retS, retV)
- }
+ val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) {
+ (i, j) => ((i, j), V.get(i, j) / sigma(j))
+ }.flatten)
+ if (computeU) {
+ // Multiply A by VS^-1
+ val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
+ val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
+ val retUdata = aCols.join(bRows).map {
+ case (key, ((rowInd, rowVal), (colInd, colVal))) =>
+ ((rowInd, colInd), rowVal * colVal)
+ }.reduceByKey(_ + _).map {
+ case ((row, col), mval) => MatrixEntry(row, col, mval)
+ }
+ val retU = SparseMatrix(retUdata, m, sigma.length)
+ MatrixSVD(retU, retS, retV)
+ } else {
+ MatrixSVD(null, retS, retV)
+ }
+ }
+}
+
+/**
+ * Top-level methods for calling sparse Singular Value Decomposition
+ * NOTE: All matrices are 0-indexed
+ */
+object SVD {
def main(args: Array[String]) {
if (args.length < 8) {
println("Usage: SVD <master> <matrix_file> <m> <n> " +
- "<k> <output_U_file> <output_S_file> <output_V_file>")
+ "<k> <output_U_file> <output_S_file> <output_V_file>")
System.exit(1)
}
- val (master, inputFile, m, n, k, output_u, output_s, output_v) =
+ val (master, inputFile, m, n, k, output_u, output_s, output_v) =
(args(0), args(1), args(2).toInt, args(3).toInt,
- args(4).toInt, args(5), args(6), args(7))
-
+ args(4).toInt, args(5), args(6), args(7))
+
val sc = new SparkContext(master, "SVD")
-
- val rawdata = sc.textFile(inputFile)
- val data = rawdata.map { line =>
- val parts = line.split(',')
- MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
+
+ val rawData = sc.textFile(inputFile)
+ val data = rawData.map {
+ line =>
+ val parts = line.split(',')
+ MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
}
- val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
+ val decomposed = new SVD().setK(k).compute(SparseMatrix(data, m, n))
val u = decomposed.U.data
val s = decomposed.S.data
val v = decomposed.V.data
-
+
println("Computed " + s.collect().length + " singular values and vectors")
u.saveAsTextFile(output_u)
s.saveAsTextFile(output_s)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala
new file mode 100644
index 0000000000..e4ef3c58e8
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Class that represents a dense matrix
+ *
+ * @param rows RDD of rows
+ * @param m number of rows
+ * @param n number of columns
+ */
+case class TallSkinnyDenseMatrix(val rows: RDD[MatrixRow], val m: Int, val n: Int)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala
new file mode 100644
index 0000000000..b3a450e923
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+/**
+ * Class that represents the singular value decomposition of a matrix
+ *
+ * @param U such that A = USV^T is a TallSkinnyDenseMatrix
+ * @param S such that A = USV^T is a simple double array
+ * @param V such that A = USV^T, V is a 2d array matrix that holds
+ * singular vectors in columns. Columns are inner arrays
+ * i.e. V(i)(j) is standard math notation V_{ij}
+ */
+case class TallSkinnyMatrixSVD(val U: TallSkinnyDenseMatrix,
+ val S: Array[Double],
+ val V: Array[Array[Double]])
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
new file mode 100644
index 0000000000..afe081295b
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.util
+
+import org.apache.spark.SparkContext._
+
+import org.apache.spark.mllib.linalg._
+
+/**
+ * Helper methods for linear algebra
+ */
+object LAUtils {
+ /**
+ * Convert a SparseMatrix into a TallSkinnyDenseMatrix
+ *
+ * @param sp Sparse matrix to be converted
+ * @return dense version of the input
+ */
+ def sparseToTallSkinnyDense(sp: SparseMatrix): TallSkinnyDenseMatrix = {
+ val m = sp.m
+ val n = sp.n
+ val rows = sp.data.map(x => (x.i, (x.j, x.mval))).groupByKey().map {
+ case (i, cols) =>
+ val rowArray = Array.ofDim[Double](n)
+ var j = 0
+ while (j < cols.size) {
+ rowArray(cols(j)._1) = cols(j)._2
+ j += 1
+ }
+ MatrixRow(i, rowArray)
+ }
+ TallSkinnyDenseMatrix(rows, m, n)
+ }
+
+ /**
+ * Convert a TallSkinnyDenseMatrix to a SparseMatrix
+ *
+ * @param a matrix to be converted
+ * @return sparse version of the input
+ */
+ def denseToSparse(a: TallSkinnyDenseMatrix): SparseMatrix = {
+ val m = a.m
+ val n = a.n
+ val data = a.rows.flatMap {
+ mrow => Array.tabulate(n)(j => MatrixEntry(mrow.i, j, mrow.data(j)))
+ .filter(x => x.mval != 0)
+ }
+ SparseMatrix(data, m, n)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
new file mode 100644
index 0000000000..5e5086b1bf
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.apache.spark.mllib.util._
+
+import org.jblas._
+
+class PCASuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ val EPSILON = 1e-3
+
+ // Return jblas matrix from sparse matrix RDD
+ def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
+ val data = matrix.data
+ val ret = DoubleMatrix.zeros(matrix.m, matrix.n)
+ matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval))
+ ret
+ }
+
+ def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) {
+ assert(a.rows == b.rows && a.columns == b.columns,
+ "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns")
+ for (i <- 0 until a.columns) {
+ val aCol = a.getColumn(i)
+ val bCol = b.getColumn(i)
+ val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1)
+ assert(diff < EPSILON, "matrix mismatch: " + diff)
+ }
+ }
+
+ test("full rank matrix pca") {
+ val m = 5
+ val n = 3
+ val dataArr = Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten
+ val data = sc.makeRDD(dataArr, 3)
+ val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
+
+ val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054),
+ (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474),
+ (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886))
+ val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)), 3)
+
+ val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))
+
+ assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
+ }
+
+ test("sparse matrix full rank matrix pca") {
+ val m = 5
+ val n = 3
+ // the entry that gets dropped is zero to test sparse support
+ val dataArr = Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten.drop(1)
+ val data = sc.makeRDD(dataArr, 3)
+ val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
+
+ val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054),
+ (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474),
+ (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886))
+ val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))
+
+ val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))
+
+ assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
+ }
+
+ test("truncated matrix pca") {
+ val m = 5
+ val n = 3
+ val dataArr = Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten
+
+ val data = sc.makeRDD(dataArr, 3)
+ val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
+
+ val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602),
+ (1,0,-0.1448), (1,1,0.7483),
+ (2,0,0.9553), (2,1,-0.0649))
+ val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))
+
+ val k = 2
+ val coeffs = new DoubleMatrix(new PCA().setK(k).compute(a))
+
+ assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,k)), coeffs)
+ }
+}
+
+
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
index a92386865a..20e2b0f84b 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
@@ -28,6 +28,8 @@ import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.util._
+
import org.jblas._
class SVDSuite extends FunSuite with BeforeAndAfterAll {
@@ -54,43 +56,77 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
ret
}
- def assertMatrixEquals(a: DoubleMatrix, b: DoubleMatrix) {
- assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch")
- val diff = DoubleMatrix.zeros(a.rows, a.columns)
- Array.tabulate(a.rows, a.columns){(i, j) =>
- diff.put(i, j,
- Math.min(Math.abs(a.get(i, j) - b.get(i, j)),
- Math.abs(a.get(i, j) + b.get(i, j)))) }
- assert(diff.norm1 < EPSILON, "matrix mismatch: " + diff.norm1)
+ def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) {
+ assert(a.rows == b.rows && a.columns == b.columns,
+ "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns")
+ for (i <- 0 until a.columns) {
+ val aCol = a.getColumn(i)
+ val bCol = b.getColumn(i)
+ val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1)
+ assert(diff < EPSILON, "matrix mismatch: " + diff)
+ }
}
test("full rank matrix svd") {
val m = 10
val n = 3
- val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten )
+ val datarr = Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten
+ val data = sc.makeRDD(datarr, 3)
val a = SparseMatrix(data, m, n)
- val decomposed = SVD.sparseSVD(a, n)
+ val decomposed = new SVD().setK(n).compute(a)
val u = decomposed.U
val v = decomposed.V
val s = decomposed.S
- val densea = getDenseMatrix(a)
- val svd = Singular.sparseSVD(densea)
+ val denseA = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(denseA)
val retu = getDenseMatrix(u)
val rets = getDenseMatrix(s)
val retv = getDenseMatrix(v)
-
+
+
+ // check individual decomposition
+ assertMatrixApproximatelyEquals(retu, svd(0))
+ assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1)))
+ assertMatrixApproximatelyEquals(retv, svd(2))
+
+ // check multiplication guarantee
+ assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
+ }
+
+ test("dense full rank matrix svd") {
+ val m = 10
+ val n = 3
+ val datarr = Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten
+ val data = sc.makeRDD(datarr, 3)
+
+ val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
+
+ val decomposed = new SVD().setK(n).setComputeU(true).compute(a)
+ val u = LAUtils.denseToSparse(decomposed.U)
+ val v = decomposed.V
+ val s = decomposed.S
+
+ val denseA = getDenseMatrix(LAUtils.denseToSparse(a))
+ val svd = Singular.sparseSVD(denseA)
+
+ val retu = getDenseMatrix(u)
+ val rets = DoubleMatrix.diag(new DoubleMatrix(s))
+ val retv = new DoubleMatrix(v)
+
+
// check individual decomposition
- assertMatrixEquals(retu, svd(0))
- assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
- assertMatrixEquals(retv, svd(2))
+ assertMatrixApproximatelyEquals(retu, svd(0))
+ assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1)))
+ assertMatrixApproximatelyEquals(retv, svd(2))
// check multiplication guarantee
- assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
+ assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
}
test("rank one matrix svd") {
@@ -102,7 +138,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val a = SparseMatrix(data, m, n)
- val decomposed = SVD.sparseSVD(a, k)
+ val decomposed = new SVD().setK(k).compute(a)
val u = decomposed.U
val s = decomposed.S
val v = decomposed.V
@@ -110,20 +146,20 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
assert(retrank == 1, "rank returned not one")
- val densea = getDenseMatrix(a)
- val svd = Singular.sparseSVD(densea)
+ val denseA = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(denseA)
val retu = getDenseMatrix(u)
val rets = getDenseMatrix(s)
val retv = getDenseMatrix(v)
// check individual decomposition
- assertMatrixEquals(retu, svd(0).getColumn(0))
- assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
- assertMatrixEquals(retv, svd(2).getColumn(0))
+ assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0))
+ assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
+ assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0))
// check multiplication guarantee
- assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
+ assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
}
test("truncated with k") {
@@ -135,14 +171,14 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val k = 1 // only one svalue above this
- val decomposed = SVD.sparseSVD(a, k)
+ val decomposed = new SVD().setK(k).compute(a)
val u = decomposed.U
val s = decomposed.S
val v = decomposed.V
val retrank = s.data.collect().length
- val densea = getDenseMatrix(a)
- val svd = Singular.sparseSVD(densea)
+ val denseA = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(denseA)
val retu = getDenseMatrix(u)
val rets = getDenseMatrix(s)
@@ -151,8 +187,8 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
assert(retrank == 1, "rank returned not one")
// check individual decomposition
- assertMatrixEquals(retu, svd(0).getColumn(0))
- assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
- assertMatrixEquals(retv, svd(2).getColumn(0))
+ assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0))
+ assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
+ assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0))
}
}