diff options
author | Reza Zadeh <rizlar@gmail.com> | 2014-01-03 22:17:24 -0800 |
---|---|---|
committer | Reza Zadeh <rizlar@gmail.com> | 2014-01-03 22:17:24 -0800 |
commit | 7f631dd2a9e2467871167da1514be9863485a96f (patch) | |
tree | d9afe25bbab903e5f2b1947191d7ecf3ef70c561 /mllib/src | |
parent | 6bcdb762a107c82ef095553ab31284623475cb2c (diff) | |
download | spark-7f631dd2a9e2467871167da1514be9863485a96f.tar.gz spark-7f631dd2a9e2467871167da1514be9863485a96f.tar.bz2 spark-7f631dd2a9e2467871167da1514be9863485a96f.zip |
start using matrixentry
Diffstat (limited to 'mllib/src')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 2198e6a1a2..08af2c855a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -23,6 +23,8 @@ import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} +import org.apache.spark.linalg.MatrixEntry + /** * Top-level methods for calling Singular Value Decomposition * NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)] @@ -60,13 +62,13 @@ object SVD { * @return Three sparse matrices: U, S, V such that A = USV^T */ def sparseSVD( - data: RDD[((Int, Int), Double)], + data: RDD[MatrixEntry], m: Int, n: Int, min_svalue: Double) - : ( RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)]) = + : ( RDD[MatrixEntry], + RDD[MatrixEntry], + RDD[MatrixEntry]) = { if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") @@ -78,7 +80,7 @@ object SVD { // Compute A^T A, assuming rows are sparse enough to fit in memory val rows = data.map(entry => - (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() + (entry.i, (entry.j, entry.mval))).groupByKey().cache() val emits = rows.flatMap{ case (rowind, cols) => cols.flatMap{ case (colind1, mval1) => cols.map{ case (colind2, mval2) => @@ -106,9 +108,9 @@ object SVD { // prepare V for returning val retV = sc.makeRDD( Array.tabulate(V.rows, sigma.length){ (i,j) => - ((i+1, j+1), V.get(i,j)) }.flatten) + MatrixEntry(i+1, j+1, V.get(i,j)) }.flatten) - val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))}) + val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>MatrixEntry(x+1,x+1,sigma(x))}) // Compute U as U = A V S^-1 // turn V S^-1 into an RDD as a sparse matrix and cache it @@ -120,6 +122,7 @@ object SVD { val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) + .map( case (row, col, mval) => MatrixEntry(row, col, mval)) (retU, retS, retV) } @@ -127,11 +130,13 @@ object SVD { def main(args: Array[String]) { if (args.length < 8) { - println("Usage: SVD <master> <matrix_file> <m> <n> <minimum_singular_value> <output_U_file> <output_S_file> <output_V_file>") + println("Usage: SVD <master> <matrix_file> <m> <n> + <minimum_singular_value> <output_U_file> <output_S_file> <output_V_file>") System.exit(1) } val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) = - (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7)) + (args(0), args(1), args(2).toInt, args(3).toInt, + args(4).toDouble, args(5), args(6), args(7)) val sc = new SparkContext(master, "SVD") |