aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorReza Zadeh <rizlar@gmail.com>2014-01-03 22:17:24 -0800
committerReza Zadeh <rizlar@gmail.com>2014-01-03 22:17:24 -0800
commit7f631dd2a9e2467871167da1514be9863485a96f (patch)
treed9afe25bbab903e5f2b1947191d7ecf3ef70c561 /mllib
parent6bcdb762a107c82ef095553ab31284623475cb2c (diff)
downloadspark-7f631dd2a9e2467871167da1514be9863485a96f.tar.gz
spark-7f631dd2a9e2467871167da1514be9863485a96f.tar.bz2
spark-7f631dd2a9e2467871167da1514be9863485a96f.zip
start using matrixentry
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala23
1 files changed, 14 insertions, 9 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
index 2198e6a1a2..08af2c855a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -23,6 +23,8 @@ import org.apache.spark.rdd.RDD
import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
+import org.apache.spark.linalg.MatrixEntry
+
/**
* Top-level methods for calling Singular Value Decomposition
* NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)]
@@ -60,13 +62,13 @@ object SVD {
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
def sparseSVD(
- data: RDD[((Int, Int), Double)],
+ data: RDD[MatrixEntry],
m: Int,
n: Int,
min_svalue: Double)
- : ( RDD[((Int, Int), Double)],
- RDD[((Int, Int), Double)],
- RDD[((Int, Int), Double)]) =
+ : ( RDD[MatrixEntry],
+ RDD[MatrixEntry],
+ RDD[MatrixEntry]) =
{
if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix")
@@ -78,7 +80,7 @@ object SVD {
// Compute A^T A, assuming rows are sparse enough to fit in memory
val rows = data.map(entry =>
- (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache()
+ (entry.i, (entry.j, entry.mval))).groupByKey().cache()
val emits = rows.flatMap{ case (rowind, cols) =>
cols.flatMap{ case (colind1, mval1) =>
cols.map{ case (colind2, mval2) =>
@@ -106,9 +108,9 @@ object SVD {
// prepare V for returning
val retV = sc.makeRDD(
Array.tabulate(V.rows, sigma.length){ (i,j) =>
- ((i+1, j+1), V.get(i,j)) }.flatten)
+ MatrixEntry(i+1, j+1, V.get(i,j)) }.flatten)
- val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))})
+ val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>MatrixEntry(x+1,x+1,sigma(x))})
// Compute U as U = A V S^-1
// turn V S^-1 into an RDD as a sparse matrix and cache it
@@ -120,6 +122,7 @@ object SVD {
val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
=> ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
+ .map( case (row, col, mval) => MatrixEntry(row, col, mval))
(retU, retS, retV)
}
@@ -127,11 +130,13 @@ object SVD {
def main(args: Array[String]) {
if (args.length < 8) {
- println("Usage: SVD <master> <matrix_file> <m> <n> <minimum_singular_value> <output_U_file> <output_S_file> <output_V_file>")
+ println("Usage: SVD <master> <matrix_file> <m> <n>
+ <minimum_singular_value> <output_U_file> <output_S_file> <output_V_file>")
System.exit(1)
}
val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) =
- (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7))
+ (args(0), args(1), args(2).toInt, args(3).toInt,
+ args(4).toDouble, args(5), args(6), args(7))
val sc = new SparkContext(master, "SVD")