diff options
author | Reza Zadeh <rizlar@gmail.com> | 2013-12-26 17:39:25 -0500 |
---|---|---|
committer | Reza Zadeh <rizlar@gmail.com> | 2013-12-26 17:39:25 -0500 |
commit | 6c3674cd235558ec09e6b97382bb541b379a3f8f (patch) | |
tree | 69ecdcfc31d081636dbaa7954b697db31aaca852 /mllib | |
parent | 6e740cc90131b29ebc17e32c66ea16727e5dcc9f (diff) | |
download | spark-6c3674cd235558ec09e6b97382bb541b379a3f8f.tar.gz spark-6c3674cd235558ec09e6b97382bb541b379a3f8f.tar.bz2 spark-6c3674cd235558ec09e6b97382bb541b379a3f8f.zip |
Object to hold the svd methods
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala | 132 |
1 files changed, 74 insertions, 58 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 99a1785074..f9b9a04f19 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -18,6 +18,8 @@ package org.apache.spark.mllib.linalg import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} @@ -49,67 +51,81 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} */ -// arguments -val MIN_SVALUE = 0.01 // minimum singular value to recover -val m = 100000 -val n = 10 -// and a 1-indexed spase matrix. - -// TODO: check min svalue -// TODO: check dimensions - -// Load and parse the data file -/*val rawdata = sc.textFile("mllib/data/als/test.data") -val data = rawdata.map { line => - val parts = line.split(',') - ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) -}*/ - -val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> ((a+1,b+1),a*b%37) }.flatten ) - - -// Compute A^T A, assuming rows are sparse enough to fit in memory -val rows = data.map(entry => - (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() -val emits = rows.flatMap{ case (rowind, cols) => - cols.flatMap{ case (colind1, mval1) => - cols.map{ case (colind2, mval2) => - ((colind1, colind2), mval1*mval2) } } -}.reduceByKey(_+_) - - -// Constructi jblas A^T A locally -val ata = DoubleMatrix.zeros(n, n) -for(entry <- emits.toArray) { - ata.put(entry._1._1-1, entry._1._2-1, entry._2) +object SVD { + def sparseSVD( + data: RDD[((Int, Int), Double)], + m: Int, + n: Int, + min_svalue: Double) + : ( RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)]) = + { + val sc = data.sparkContext + + // Compute A^T A, assuming rows are sparse enough to fit in memory + val rows = data.map(entry => + (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() + val emits = rows.flatMap{ case (rowind, cols) => + cols.flatMap{ case (colind1, mval1) => + cols.map{ case (colind2, mval2) => + ((colind1, colind2), mval1*mval2) } } + }.reduceByKey(_+_) + + + // Constructi jblas A^T A locally + val ata = DoubleMatrix.zeros(n, n) + for(entry <- emits.toArray) { + ata.put(entry._1._1-1, entry._1._2-1, entry._2) + } + + // Since A^T A is small, we can compute its SVD directly + val svd = Singular.sparseSVD(ata) + val V = svd(0) + val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue) + + // threshold s values + if(sigma.isEmpty) { + // TODO: return empty + } + + // prepare V for returning + val retV = sc.makeRDD( + Array.tabulate(V.rows, sigma.length){ (i,j) => + ((i+1, j+1), V.get(i,j)) }.flatten) + + val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))}) + + // Compute U as U = A V S^-1 + // turn V S^-1 into an RDD as a sparse matrix and cache it + val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) + { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() + + // Multiply A by VS^-1 + val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) + val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) + val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) + + (retU, retS, retV) + } + + def main(args: Array[String]) { + if (args.length < 4) { + println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]") + System.exit(1) + } + val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt) + val runs = if (args.length >= 5) args(4).toInt else 1 + val sc = new SparkContext(master, "KMeans") + val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache() + println("Cost: ") + System.exit(0) + //val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> ((a+1,b+1),a*b%37) }.flatten ) + } } -// Since A^T A is small, we can compute its SVD directly -val svd = Singular.sparseSVD(ata) -val V = svd(0) -val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= MIN_SVALUE) - -// threshold s values -if(sigma.isEmpty) { - // TODO: return empty -} - -// prepare V for returning -val retV = sc.makeRDD( - Array.tabulate(V.rows, sigma.length){ (i,j) => - ((i+1, j+1), V.get(i,j)) }.flatten) - -val retS = sc.makeRDD(sigma) -// Compute U as U = A V S^-1 -// turn V S^-1 into an RDD as a sparse matrix and cache it -val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) - { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() -// Multiply A by VS^-1 -val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) -val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) -val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) - => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) |