aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorReza Zadeh <rizlar@gmail.com>2013-12-27 00:34:59 -0500
committerReza Zadeh <rizlar@gmail.com>2013-12-27 00:34:59 -0500
commitfa1e8d8cbf916f963e1ea000683a11d83551f870 (patch)
tree4a378d2cfa01bcf37b6b3201be2f9325da18079c /mllib
parent16de5268e3652498b47b0600fe5cf9cf10d0dd83 (diff)
downloadspark-fa1e8d8cbf916f963e1ea000683a11d83551f870.tar.gz
spark-fa1e8d8cbf916f963e1ea000683a11d83551f870.tar.bz2
spark-fa1e8d8cbf916f963e1ea000683a11d83551f870.zip
test for truncated svd
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala101
1 files changed, 50 insertions, 51 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
index 2ce0df1e5d..a799aa3280 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
@@ -57,65 +57,65 @@ object SVD {
m: Int,
n: Int,
min_svalue: Double)
- : ( RDD[((Int, Int), Double)],
- RDD[((Int, Int), Double)],
- RDD[((Int, Int), Double)]) =
+ : ( RDD[((Int, Int), Double)],
+ RDD[((Int, Int), Double)],
+ RDD[((Int, Int), Double)]) =
{
- if (m < n) {
+ if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix")
}
- if (min_svalue < 1.0e-9) {
+ if (min_svalue < 1.0e-9) {
throw new IllegalArgumentException("Minimum singular value must be greater than 1e-9")
}
- val sc = data.sparkContext
+ val sc = data.sparkContext
- // Compute A^T A, assuming rows are sparse enough to fit in memory
- val rows = data.map(entry =>
- (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache()
- val emits = rows.flatMap{ case (rowind, cols) =>
- cols.flatMap{ case (colind1, mval1) =>
- cols.map{ case (colind2, mval2) =>
- ((colind1, colind2), mval1*mval2) } }
- }.reduceByKey(_+_)
+ // Compute A^T A, assuming rows are sparse enough to fit in memory
+ val rows = data.map(entry =>
+ (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache()
+ val emits = rows.flatMap{ case (rowind, cols) =>
+ cols.flatMap{ case (colind1, mval1) =>
+ cols.map{ case (colind2, mval2) =>
+ ((colind1, colind2), mval1*mval2) } }
+ }.reduceByKey(_+_)
- // Constructi jblas A^T A locally
- val ata = DoubleMatrix.zeros(n, n)
- for(entry <- emits.toArray) {
- ata.put(entry._1._1-1, entry._1._2-1, entry._2)
- }
+ // Constructi jblas A^T A locally
+ val ata = DoubleMatrix.zeros(n, n)
+ for(entry <- emits.toArray) {
+ ata.put(entry._1._1-1, entry._1._2-1, entry._2)
+ }
- // Since A^T A is small, we can compute its SVD directly
- val svd = Singular.sparseSVD(ata)
- val V = svd(0)
- val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue)
+ // Since A^T A is small, we can compute its SVD directly
+ val svd = Singular.sparseSVD(ata)
+ val V = svd(0)
+ val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue)
- // threshold s values
- if(sigma.isEmpty) {
+ // threshold s values
+ if(sigma.isEmpty) {
throw new Exception("All singular values are smaller than min_svalue: " + min_svalue)
- }
+ }
- // prepare V for returning
- val retV = sc.makeRDD(
- Array.tabulate(V.rows, sigma.length){ (i,j) =>
- ((i+1, j+1), V.get(i,j)) }.flatten)
+ // prepare V for returning
+ val retV = sc.makeRDD(
+ Array.tabulate(V.rows, sigma.length){ (i,j) =>
+ ((i+1, j+1), V.get(i,j)) }.flatten)
- val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))})
+ val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))})
- // Compute U as U = A V S^-1
- // turn V S^-1 into an RDD as a sparse matrix and cache it
- val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
+ // Compute U as U = A V S^-1
+ // turn V S^-1 into an RDD as a sparse matrix and cache it
+ val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
{ (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache()
- // Multiply A by VS^-1
- val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2)))
- val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
- val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
+ // Multiply A by VS^-1
+ val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2)))
+ val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
+ val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
=> ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
-
- (retU, retS, retV)
+
+ (retU, retS, retV)
}
@@ -125,24 +125,23 @@ object SVD {
System.exit(1)
}
val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) =
- (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7))
+ (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7))
val sc = new SparkContext(master, "SVD")
- val rawdata = sc.textFile(inputFile)
- val data = rawdata.map { line =>
- val parts = line.split(',')
- ((parts(0).toInt, parts(1).toInt), parts(2).toDouble)
- }
+ val rawdata = sc.textFile(inputFile)
+ val data = rawdata.map { line =>
+ val parts = line.split(',')
+ ((parts(0).toInt, parts(1).toInt), parts(2).toDouble)
+ }
- val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue)
+ val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue)
println("Computed " + s.toArray.length + " singular values and vectors")
- u.saveAsTextFile(output_u)
- s.saveAsTextFile(output_s)
- v.saveAsTextFile(output_v)
+ u.saveAsTextFile(output_u)
+ s.saveAsTextFile(output_s)
+ v.saveAsTextFile(output_v)
System.exit(0)
}
}
-