From fa1e8d8cbf916f963e1ea000683a11d83551f870 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 27 Dec 2013 00:34:59 -0500 Subject: test for truncated svd --- .../org/apache/spark/mllib/linalg/sparsesvd.scala | 101 ++++++++++----------- 1 file changed, 50 insertions(+), 51 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 2ce0df1e5d..a799aa3280 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -57,65 +57,65 @@ object SVD { m: Int, n: Int, min_svalue: Double) - : ( RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)]) = + : ( RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)]) = { - if (m < n) { + if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") } - if (min_svalue < 1.0e-9) { + if (min_svalue < 1.0e-9) { throw new IllegalArgumentException("Minimum singular value must be greater than 1e-9") } - val sc = data.sparkContext + val sc = data.sparkContext - // Compute A^T A, assuming rows are sparse enough to fit in memory - val rows = data.map(entry => - (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() - val emits = rows.flatMap{ case (rowind, cols) => - cols.flatMap{ case (colind1, mval1) => - cols.map{ case (colind2, mval2) => - ((colind1, colind2), mval1*mval2) } } - }.reduceByKey(_+_) + // Compute A^T A, assuming rows are sparse enough to fit in memory + val rows = data.map(entry => + (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() + val emits = rows.flatMap{ case (rowind, cols) => + cols.flatMap{ case (colind1, mval1) => + cols.map{ case (colind2, mval2) => + ((colind1, colind2), mval1*mval2) } } + }.reduceByKey(_+_) - // Constructi jblas A^T A locally - val ata = DoubleMatrix.zeros(n, n) - for(entry <- emits.toArray) { - ata.put(entry._1._1-1, entry._1._2-1, entry._2) - } + // Constructi jblas A^T A locally + val ata = DoubleMatrix.zeros(n, n) + for(entry <- emits.toArray) { + ata.put(entry._1._1-1, entry._1._2-1, entry._2) + } - // Since A^T A is small, we can compute its SVD directly - val svd = Singular.sparseSVD(ata) - val V = svd(0) - val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue) + // Since A^T A is small, we can compute its SVD directly + val svd = Singular.sparseSVD(ata) + val V = svd(0) + val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue) - // threshold s values - if(sigma.isEmpty) { + // threshold s values + if(sigma.isEmpty) { throw new Exception("All singular values are smaller than min_svalue: " + min_svalue) - } + } - // prepare V for returning - val retV = sc.makeRDD( - Array.tabulate(V.rows, sigma.length){ (i,j) => - ((i+1, j+1), V.get(i,j)) }.flatten) + // prepare V for returning + val retV = sc.makeRDD( + Array.tabulate(V.rows, sigma.length){ (i,j) => + ((i+1, j+1), V.get(i,j)) }.flatten) - val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))}) + val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))}) - // Compute U as U = A V S^-1 - // turn V S^-1 into an RDD as a sparse matrix and cache it - val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) + // Compute U as U = A V S^-1 + // turn V S^-1 into an RDD as a sparse matrix and cache it + val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() - // Multiply A by VS^-1 - val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) - val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) - val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + // Multiply A by VS^-1 + val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) + val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) + val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) - - (retU, retS, retV) + + (retU, retS, retV) } @@ -125,24 +125,23 @@ object SVD { System.exit(1) } val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) = - (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7)) + (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7)) val sc = new SparkContext(master, "SVD") - val rawdata = sc.textFile(inputFile) - val data = rawdata.map { line => - val parts = line.split(',') - ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) - } + val rawdata = sc.textFile(inputFile) + val data = rawdata.map { line => + val parts = line.split(',') + ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) + } - val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) println("Computed " + s.toArray.length + " singular values and vectors") - u.saveAsTextFile(output_u) - s.saveAsTextFile(output_s) - v.saveAsTextFile(output_v) + u.saveAsTextFile(output_u) + s.saveAsTextFile(output_s) + v.saveAsTextFile(output_v) System.exit(0) } } - -- cgit v1.2.3