aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorReza Zadeh <rizlar@gmail.com>2013-12-26 17:39:25 -0500
committerReza Zadeh <rizlar@gmail.com>2013-12-26 17:39:25 -0500
commit6c3674cd235558ec09e6b97382bb541b379a3f8f (patch)
tree69ecdcfc31d081636dbaa7954b697db31aaca852 /mllib
parent6e740cc90131b29ebc17e32c66ea16727e5dcc9f (diff)
downloadspark-6c3674cd235558ec09e6b97382bb541b379a3f8f.tar.gz
spark-6c3674cd235558ec09e6b97382bb541b379a3f8f.tar.bz2
spark-6c3674cd235558ec09e6b97382bb541b379a3f8f.zip
Object to hold the svd methods
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala132
1 files changed, 74 insertions, 58 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
index 99a1785074..f9b9a04f19 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala
@@ -18,6 +18,8 @@
package org.apache.spark.mllib.linalg
import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
@@ -49,67 +51,81 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
*/
-// arguments
-val MIN_SVALUE = 0.01 // minimum singular value to recover
-val m = 100000
-val n = 10
-// and a 1-indexed spase matrix.
-
-// TODO: check min svalue
-// TODO: check dimensions
-
-// Load and parse the data file
-/*val rawdata = sc.textFile("mllib/data/als/test.data")
-val data = rawdata.map { line =>
- val parts = line.split(',')
- ((parts(0).toInt, parts(1).toInt), parts(2).toDouble)
-}*/
-
-val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> ((a+1,b+1),a*b%37) }.flatten )
-
-
-// Compute A^T A, assuming rows are sparse enough to fit in memory
-val rows = data.map(entry =>
- (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache()
-val emits = rows.flatMap{ case (rowind, cols) =>
- cols.flatMap{ case (colind1, mval1) =>
- cols.map{ case (colind2, mval2) =>
- ((colind1, colind2), mval1*mval2) } }
-}.reduceByKey(_+_)
-
-
-// Constructi jblas A^T A locally
-val ata = DoubleMatrix.zeros(n, n)
-for(entry <- emits.toArray) {
- ata.put(entry._1._1-1, entry._1._2-1, entry._2)
+object SVD {
+ def sparseSVD(
+ data: RDD[((Int, Int), Double)],
+ m: Int,
+ n: Int,
+ min_svalue: Double)
+ : ( RDD[((Int, Int), Double)],
+ RDD[((Int, Int), Double)],
+ RDD[((Int, Int), Double)]) =
+ {
+ val sc = data.sparkContext
+
+ // Compute A^T A, assuming rows are sparse enough to fit in memory
+ val rows = data.map(entry =>
+ (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache()
+ val emits = rows.flatMap{ case (rowind, cols) =>
+ cols.flatMap{ case (colind1, mval1) =>
+ cols.map{ case (colind2, mval2) =>
+ ((colind1, colind2), mval1*mval2) } }
+ }.reduceByKey(_+_)
+
+
+ // Constructi jblas A^T A locally
+ val ata = DoubleMatrix.zeros(n, n)
+ for(entry <- emits.toArray) {
+ ata.put(entry._1._1-1, entry._1._2-1, entry._2)
+ }
+
+ // Since A^T A is small, we can compute its SVD directly
+ val svd = Singular.sparseSVD(ata)
+ val V = svd(0)
+ val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue)
+
+ // threshold s values
+ if(sigma.isEmpty) {
+ // TODO: return empty
+ }
+
+ // prepare V for returning
+ val retV = sc.makeRDD(
+ Array.tabulate(V.rows, sigma.length){ (i,j) =>
+ ((i+1, j+1), V.get(i,j)) }.flatten)
+
+ val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))})
+
+ // Compute U as U = A V S^-1
+ // turn V S^-1 into an RDD as a sparse matrix and cache it
+ val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
+ { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache()
+
+ // Multiply A by VS^-1
+ val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2)))
+ val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
+ val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
+ => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
+
+ (retU, retS, retV)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]")
+ System.exit(1)
+ }
+ val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt)
+ val runs = if (args.length >= 5) args(4).toInt else 1
+ val sc = new SparkContext(master, "KMeans")
+ val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache()
+ println("Cost: ")
+ System.exit(0)
+ //val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> ((a+1,b+1),a*b%37) }.flatten )
+ }
}
-// Since A^T A is small, we can compute its SVD directly
-val svd = Singular.sparseSVD(ata)
-val V = svd(0)
-val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= MIN_SVALUE)
-
-// threshold s values
-if(sigma.isEmpty) {
- // TODO: return empty
-}
-
-// prepare V for returning
-val retV = sc.makeRDD(
- Array.tabulate(V.rows, sigma.length){ (i,j) =>
- ((i+1, j+1), V.get(i,j)) }.flatten)
-
-val retS = sc.makeRDD(sigma)
-// Compute U as U = A V S^-1
-// turn V S^-1 into an RDD as a sparse matrix and cache it
-val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
- { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache()
-// Multiply A by VS^-1
-val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2)))
-val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
-val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
- => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)