aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala153
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala142
2 files changed, 295 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
new file mode 100644
index 0000000000..2c82c6b958
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
+
+/**
+ * Top-level methods for calling Singular Value Decomposition
+ * NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)]
+ */
+object SVD {
+/**
+ * Singular Value Decomposition for Tall and Skinny matrices.
+ * Given an m x n matrix A, this will compute matrices U, S, V such that
+ * A = U * S * V'
+ *
+ * There is no restriction on m, but we require n^2 doubles to fit in memory.
+ * Further, n should be less than m.
+ *
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small),
+ * from which we recover S and V.
+ * Then we compute U via easy matrix multiplication
+ * as U = A * V * S^-1
+ *
+ * Only singular vectors associated with singular values
+ * greater or equal to MIN_SVALUE are recovered. If there are k
+ * such values, then the dimensions of the return will be:
+ *
+ * S is k x k and diagonal, holding the singular values on diagonal
+ * U is m x k and satisfies U'U = eye(k)
+ * V is n x k and satisfies V'V = eye(k)
+ *
+ * All input and output is expected in sparse matrix format, 1-indexed
+ * as tuples of the form ((i,j),value) all in RDDs
+ *
+ * @param data RDD Matrix in sparse 1-index format ((int, int), value)
+ * @param m number of rows
+ * @param n number of columns
+ * @param min_svalue Recover singular values greater or equal to min_svalue
+ * @return Three sparse matrices: U, S, V such that A = USV^T
+ */
+ def sparseSVD(
+ data: RDD[((Int, Int), Double)],
+ m: Int,
+ n: Int,
+ min_svalue: Double)
+ : ( RDD[((Int, Int), Double)],
+ RDD[((Int, Int), Double)],
+ RDD[((Int, Int), Double)]) =
+ {
+ if (m < n || m <= 0 || n <= 0) {
+ throw new IllegalArgumentException("Expecting a tall and skinny matrix")
+ }
+
+ if (min_svalue < 1.0e-8) {
+ throw new IllegalArgumentException("Minimum singular value requested must be greater than 1e-9")
+ }
+
+ // Compute A^T A, assuming rows are sparse enough to fit in memory
+ val rows = data.map(entry =>
+ (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache()
+ val emits = rows.flatMap{ case (rowind, cols) =>
+ cols.flatMap{ case (colind1, mval1) =>
+ cols.map{ case (colind2, mval2) =>
+ ((colind1, colind2), mval1*mval2) } }
+ }.reduceByKey(_+_)
+
+ // Construct jblas A^T A locally
+ val ata = DoubleMatrix.zeros(n, n)
+ for(entry <- emits.toArray) {
+ ata.put(entry._1._1-1, entry._1._2-1, entry._2)
+ }
+
+ // Since A^T A is small, we can compute its SVD directly
+ val svd = Singular.sparseSVD(ata)
+ val V = svd(0)
+ val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue)
+
+ // threshold s values
+ if(sigma.isEmpty) {
+ throw new Exception("All singular values are smaller than min_svalue: " + min_svalue)
+ }
+
+ val sc = data.sparkContext
+
+ // prepare V for returning
+ val retV = sc.makeRDD(
+ Array.tabulate(V.rows, sigma.length){ (i,j) =>
+ ((i+1, j+1), V.get(i,j)) }.flatten)
+
+ val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))})
+
+ // Compute U as U = A V S^-1
+ // turn V S^-1 into an RDD as a sparse matrix and cache it
+ val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
+ { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache()
+
+ // Multiply A by VS^-1
+ val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2)))
+ val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
+ val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
+ => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
+
+ (retU, retS, retV)
+ }
+
+
+ def main(args: Array[String]) {
+ if (args.length < 8) {
+ println("Usage: SVD <master> <matrix_file> <m> <n> <minimum_singular_value> <output_U_file> <output_S_file> <output_V_file>")
+ System.exit(1)
+ }
+ val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) =
+ (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7))
+
+ val sc = new SparkContext(master, "SVD")
+
+ val rawdata = sc.textFile(inputFile)
+ val data = rawdata.map { line =>
+ val parts = line.split(',')
+ ((parts(0).toInt, parts(1).toInt), parts(2).toDouble)
+ }
+
+ val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue)
+ println("Computed " + s.toArray.length + " singular values and vectors")
+ u.saveAsTextFile(output_u)
+ s.saveAsTextFile(output_s)
+ v.saveAsTextFile(output_v)
+ System.exit(0)
+ }
+}
+
+
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
new file mode 100644
index 0000000000..726650af0a
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.jblas._
+
+class SVDSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ val EPSILON = 1e-4
+
+ // Return jblas matrix from sparse matrix RDD
+ def getDenseMatrix(matrix:RDD[((Int, Int), Double)], m:Int, n:Int) : DoubleMatrix = {
+ val ret = DoubleMatrix.zeros(m, n)
+ matrix.toArray.map(x => ret.put(x._1._1-1, x._1._2-1, x._2))
+ ret
+ }
+
+ def assertMatrixEquals(a:DoubleMatrix, b:DoubleMatrix) {
+ assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch")
+ val diff = DoubleMatrix.zeros(a.rows, a.columns)
+ Array.tabulate(a.rows, a.columns){(i,j) =>
+ diff.put(i,j,
+ Math.min(Math.abs(a.get(i,j)-b.get(i,j)),
+ Math.abs(a.get(i,j)+b.get(i,j)))) }
+ assert(diff.norm1 < EPSILON, "matrix mismatch: " + diff.norm1)
+ }
+
+ test("full rank matrix svd") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=>
+ ((a+1,b+1), (a+2).toDouble*(b+1)/(1+a+b)) }.flatten )
+ val min_svalue = 1.0e-8
+
+ val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue)
+
+ val densea = getDenseMatrix(data, m, n)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u,m,n)
+ val rets = getDenseMatrix(s,n,n)
+ val retv = getDenseMatrix(v,n,n)
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
+ assertMatrixEquals(retv, svd(2))
+
+ // check multiplication guarantee
+ assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
+ }
+
+ test("rank one matrix svd") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=>
+ ((a+1,b+1), 1.0) }.flatten )
+ val min_svalue = 1.0e-4
+
+ val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue)
+ val retrank = s.toArray.length
+
+ assert(retrank == 1, "rank returned not one")
+
+ val densea = getDenseMatrix(data, m, n)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u,m,retrank)
+ val rets = getDenseMatrix(s,retrank,retrank)
+ val retv = getDenseMatrix(v,n,retrank)
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0).getColumn(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
+ assertMatrixEquals(retv, svd(2).getColumn(0))
+
+ // check multiplication guarantee
+ assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
+ }
+
+ test("truncated with min singular value") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=>
+ ((a+1,b+1), (a+2).toDouble*(b+1)/(1+a+b)) }.flatten )
+
+ val min_svalue = 5.0 // only one svalue above this
+
+ val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue)
+ val retrank = s.toArray.length
+
+ val densea = getDenseMatrix(data, m, n)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u,m,retrank)
+ val rets = getDenseMatrix(s,retrank,retrank)
+ val retv = getDenseMatrix(v,n,retrank)
+
+ assert(retrank == 1, "rank returned not one")
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0).getColumn(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
+ assertMatrixEquals(retv, svd(2).getColumn(0))
+ }
+}