diff options
author | Matei Zaharia <matei@databricks.com> | 2014-01-22 14:01:30 -0800 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-01-22 14:01:30 -0800 |
commit | d009b17d137edf2f1b9da04254e55fb7455faa3d (patch) | |
tree | 9265b72663c85f4a25f40edf00154ac7e3536f2f | |
parent | 749f842827c7e7766a342b6b0a437803044a9f90 (diff) | |
parent | 85b95d039ddfc7a2b2b27f506852859181ed16c1 (diff) | |
download | spark-d009b17d137edf2f1b9da04254e55fb7455faa3d.tar.gz spark-d009b17d137edf2f1b9da04254e55fb7455faa3d.tar.bz2 spark-d009b17d137edf2f1b9da04254e55fb7455faa3d.zip |
Merge pull request #315 from rezazadeh/sparsesvd
Sparse SVD
# Singular Value Decomposition
Given an *m x n* matrix *A*, compute matrices *U, S, V* such that
*A = U * S * V^T*
There is no restriction on m, but we require n^2 doubles to fit in memory.
Further, n should be less than m.
The decomposition is computed by first computing *A^TA = V S^2 V^T*,
computing svd locally on that (since n x n is small),
from which we recover S and V.
Then we compute U via easy matrix multiplication
as *U = A * V * S^-1*
Only singular vectors associated with the largest k singular values
If there are k such values, then the dimensions of the return will be:
* *S* is *k x k* and diagonal, holding the singular values on diagonal.
* *U* is *m x k* and satisfies U^T*U = eye(k).
* *V* is *n x k* and satisfies V^TV = eye(k).
All input and output is expected in sparse matrix format, 0-indexed
as tuples of the form ((i,j),value) all in RDDs.
# Testing
Tests included. They test:
- Decomposition promise (A = USV^T)
- For small matrices, output is compared to that of jblas
- Rank 1 matrix test included
- Full Rank matrix test included
- Middle-rank matrix forced via k included
# Example Usage
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.SVD
import org.apache.spark.mllib.linalg.SparseMatrix
import org.apache.spark.mllib.linalg.MatrixyEntry
// Load and parse the data file
val data = sc.textFile("mllib/data/als/test.data").map { line =>
val parts = line.split(',')
MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
}
val m = 4
val n = 4
// recover top 1 singular vector
val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)
println("singular values = " + decomposed.S.data.toArray.mkString)
# Documentation
Added to docs/mllib-guide.md
7 files changed, 543 insertions, 0 deletions
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index a22a22184b..0cc5505b50 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -438,3 +438,54 @@ signals), you can use the trainImplicit method to get better results. # Build the recommendation model using Alternating Least Squares based on implicit ratings model = ALS.trainImplicit(ratings, 1, 20) {% endhighlight %} + + +# Singular Value Decomposition +Singular Value Decomposition for Tall and Skinny matrices. +Given an *m x n* matrix *A*, we can compute matrices *U, S, V* such that + +*A = U * S * V^T* + +There is no restriction on m, but we require n^2 doubles to +fit in memory locally on one machine. +Further, n should be less than m. + +The decomposition is computed by first computing *A^TA = V S^2 V^T*, +computing SVD locally on that (since n x n is small), +from which we recover S and V. +Then we compute U via easy matrix multiplication +as *U = A * V * S^-1* + +Only singular vectors associated with largest k singular values +are recovered. If there are k +such values, then the dimensions of the return will be: + +* *S* is *k x k* and diagonal, holding the singular values on diagonal. +* *U* is *m x k* and satisfies U^T*U = eye(k). +* *V* is *n x k* and satisfies V^TV = eye(k). + +All input and output is expected in sparse matrix format, 0-indexed +as tuples of the form ((i,j),value) all in +SparseMatrix RDDs. Below is example usage. + +{% highlight scala %} + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.SVD +import org.apache.spark.mllib.linalg.SparseMatrix +import org.apache.spark.mllib.linalg.MatrixEntry + +// Load and parse the data file +val data = sc.textFile("mllib/data/als/test.data").map { line => + val parts = line.split(',') + MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) +} +val m = 4 +val n = 4 +val k = 1 + +// recover largest singular vector +val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k) +val = decomposed.S.data + +println("singular values = " + s.toArray.mkString) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala new file mode 100644 index 0000000000..19676fcc1a --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.SVD +import org.apache.spark.mllib.linalg.MatrixEntry +import org.apache.spark.mllib.linalg.SparseMatrix + +/** + * Compute SVD of an example matrix + * Input file should be comma separated, 1 indexed of the form + * i,j,value + * Where i is the column, j the row, and value is the matrix entry + * + * For example input file, see: + * mllib/data/als/test.data (example is 4 x 4) + */ +object SparkSVD { + def main(args: Array[String]) { + if (args.length != 4) { + System.err.println("Usage: SparkSVD <master> <file> m n") + System.exit(1) + } + val sc = new SparkContext(args(0), "SVD", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + // Load and parse the data file + val data = sc.textFile(args(1)).map { line => + val parts = line.split(',') + MatrixEntry(parts(0).toInt - 1, parts(1).toInt - 1, parts(2).toDouble) + } + val m = args(2).toInt + val n = args(3).toInt + + // recover largest singular vector + val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1) + val u = decomposed.U.data + val s = decomposed.S.data + val v = decomposed.V.data + + println("singular values = " + s.toArray.mkString) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala new file mode 100644 index 0000000000..416996fcbe --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +/** + * Class that represents an entry in a sparse matrix of doubles. + * + * @param i row index (0 indexing used) + * @param j column index (0 indexing used) + * @param mval value of entry in matrix + */ +case class MatrixEntry(val i: Int, val j: Int, val mval: Double) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala new file mode 100644 index 0000000000..319f82b449 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +/** + * Class that represents the SV decomposition of a matrix + * + * @param U such that A = USV^T + * @param S such that A = USV^T + * @param V such that A = USV^T + */ +case class MatrixSVD(val U: SparseMatrix, + val S: SparseMatrix, + val V: SparseMatrix) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala new file mode 100644 index 0000000000..e476b53450 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} + + +/** + * Class used to obtain singular value decompositions + */ +class SVD { + private var k: Int = 1 + + /** + * Set the number of top-k singular vectors to return + */ + def setK(k: Int): SVD = { + this.k = k + this + } + + /** + * Compute SVD using the current set parameters + */ + def compute(matrix: SparseMatrix) : MatrixSVD = { + SVD.sparseSVD(matrix, k) + } +} + + +/** + * Top-level methods for calling Singular Value Decomposition + * NOTE: All matrices are in 0-indexed sparse format RDD[((int, int), value)] + */ +object SVD { +/** + * Singular Value Decomposition for Tall and Skinny matrices. + * Given an m x n matrix A, this will compute matrices U, S, V such that + * A = U * S * V' + * + * There is no restriction on m, but we require n^2 doubles to fit in memory. + * Further, n should be less than m. + * + * The decomposition is computed by first computing A'A = V S^2 V', + * computing svd locally on that (since n x n is small), + * from which we recover S and V. + * Then we compute U via easy matrix multiplication + * as U = A * V * S^-1 + * + * Only the k largest singular values and associated vectors are found. + * If there are k such values, then the dimensions of the return will be: + * + * S is k x k and diagonal, holding the singular values on diagonal + * U is m x k and satisfies U'U = eye(k) + * V is n x k and satisfies V'V = eye(k) + * + * All input and output is expected in sparse matrix format, 0-indexed + * as tuples of the form ((i,j),value) all in RDDs using the + * SparseMatrix class + * + * @param matrix sparse matrix to factorize + * @param k Recover k singular values and vectors + * @return Three sparse matrices: U, S, V such that A = USV^T + */ + def sparseSVD( + matrix: SparseMatrix, + k: Int) + : MatrixSVD = + { + val data = matrix.data + val m = matrix.m + val n = matrix.n + + if (m < n || m <= 0 || n <= 0) { + throw new IllegalArgumentException("Expecting a tall and skinny matrix") + } + + if (k < 1 || k > n) { + throw new IllegalArgumentException("Must request up to n singular values") + } + + // Compute A^T A, assuming rows are sparse enough to fit in memory + val rows = data.map(entry => + (entry.i, (entry.j, entry.mval))).groupByKey() + val emits = rows.flatMap{ case (rowind, cols) => + cols.flatMap{ case (colind1, mval1) => + cols.map{ case (colind2, mval2) => + ((colind1, colind2), mval1*mval2) } } + }.reduceByKey(_+_) + + // Construct jblas A^T A locally + val ata = DoubleMatrix.zeros(n, n) + for (entry <- emits.toArray) { + ata.put(entry._1._1, entry._1._2, entry._2) + } + + // Since A^T A is small, we can compute its SVD directly + val svd = Singular.sparseSVD(ata) + val V = svd(0) + val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9) + + if (sigmas.size < k) { + throw new Exception("Not enough singular values to return") + } + + val sigma = sigmas.take(k) + + val sc = data.sparkContext + + // prepare V for returning + val retVdata = sc.makeRDD( + Array.tabulate(V.rows, sigma.length){ (i,j) => + MatrixEntry(i, j, V.get(i,j)) }.flatten) + val retV = SparseMatrix(retVdata, V.rows, sigma.length) + + val retSdata = sc.makeRDD(Array.tabulate(sigma.length){ + x => MatrixEntry(x, x, sigma(x))}) + + val retS = SparseMatrix(retSdata, sigma.length, sigma.length) + + // Compute U as U = A V S^-1 + // turn V S^-1 into an RDD as a sparse matrix + val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) + { (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten) + + // Multiply A by VS^-1 + val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) + val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) + val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) + .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)} + val retU = SparseMatrix(retUdata, m, sigma.length) + + MatrixSVD(retU, retS, retV) + } + + + def main(args: Array[String]) { + if (args.length < 8) { + println("Usage: SVD <master> <matrix_file> <m> <n> " + + "<k> <output_U_file> <output_S_file> <output_V_file>") + System.exit(1) + } + + val (master, inputFile, m, n, k, output_u, output_s, output_v) = + (args(0), args(1), args(2).toInt, args(3).toInt, + args(4).toInt, args(5), args(6), args(7)) + + val sc = new SparkContext(master, "SVD") + + val rawdata = sc.textFile(inputFile) + val data = rawdata.map { line => + val parts = line.split(',') + MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) + } + + val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k) + val u = decomposed.U.data + val s = decomposed.S.data + val v = decomposed.V.data + + println("Computed " + s.toArray.length + " singular values and vectors") + u.saveAsTextFile(output_u) + s.saveAsTextFile(output_s) + v.saveAsTextFile(output_v) + System.exit(0) + } +} + + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala new file mode 100644 index 0000000000..cbd1a2a5a4 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD + + +/** + * Class that represents a sparse matrix + * + * @param data RDD of nonzero entries + * @param m number of rows + * @param n numner of columns + */ +case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala new file mode 100644 index 0000000000..32f3f141cd --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import org.jblas._ + +class SVDSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + val EPSILON = 1e-4 + + // Return jblas matrix from sparse matrix RDD + def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = { + val data = matrix.data + val m = matrix.m + val n = matrix.n + val ret = DoubleMatrix.zeros(m, n) + matrix.data.toArray.map(x => ret.put(x.i, x.j, x.mval)) + ret + } + + def assertMatrixEquals(a: DoubleMatrix, b: DoubleMatrix) { + assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch") + val diff = DoubleMatrix.zeros(a.rows, a.columns) + Array.tabulate(a.rows, a.columns){(i, j) => + diff.put(i, j, + Math.min(Math.abs(a.get(i, j) - b.get(i, j)), + Math.abs(a.get(i, j) + b.get(i, j)))) } + assert(diff.norm1 < EPSILON, "matrix mismatch: " + diff.norm1) + } + + test("full rank matrix svd") { + val m = 10 + val n = 3 + val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten ) + + val a = SparseMatrix(data, m, n) + + val decomposed = SVD.sparseSVD(a, n) + val u = decomposed.U + val v = decomposed.V + val s = decomposed.S + + val densea = getDenseMatrix(a) + val svd = Singular.sparseSVD(densea) + + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) + + // check individual decomposition + assertMatrixEquals(retu, svd(0)) + assertMatrixEquals(rets, DoubleMatrix.diag(svd(1))) + assertMatrixEquals(retv, svd(2)) + + // check multiplication guarantee + assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) + } + + test("rank one matrix svd") { + val m = 10 + val n = 3 + val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) => + MatrixEntry(a, b, 1.0) }.flatten ) + val k = 1 + + val a = SparseMatrix(data, m, n) + + val decomposed = SVD.sparseSVD(a, k) + val u = decomposed.U + val s = decomposed.S + val v = decomposed.V + val retrank = s.data.toArray.length + + assert(retrank == 1, "rank returned not one") + + val densea = getDenseMatrix(a) + val svd = Singular.sparseSVD(densea) + + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) + + // check individual decomposition + assertMatrixEquals(retu, svd(0).getColumn(0)) + assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) + assertMatrixEquals(retv, svd(2).getColumn(0)) + + // check multiplication guarantee + assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) + } + + test("truncated with k") { + val m = 10 + val n = 3 + val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten ) + val a = SparseMatrix(data, m, n) + + val k = 1 // only one svalue above this + + val decomposed = SVD.sparseSVD(a, k) + val u = decomposed.U + val s = decomposed.S + val v = decomposed.V + val retrank = s.data.toArray.length + + val densea = getDenseMatrix(a) + val svd = Singular.sparseSVD(densea) + + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) + + assert(retrank == 1, "rank returned not one") + + // check individual decomposition + assertMatrixEquals(retu, svd(0).getColumn(0)) + assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) + assertMatrixEquals(retv, svd(2).getColumn(0)) + } +} |