diff options
author | Reza Zadeh <rizlar@gmail.com> | 2014-01-04 14:28:07 -0800 |
---|---|---|
committer | Reza Zadeh <rizlar@gmail.com> | 2014-01-04 14:28:07 -0800 |
commit | 06c0f7628a213a08ef5adeab903160b806680acf (patch) | |
tree | 3343a151864808dabbc15ba481f125e6723ab55e /mllib/src/main | |
parent | cdff9fc858b9b83eb1119ec2a6d1d3c9a66f47a9 (diff) | |
download | spark-06c0f7628a213a08ef5adeab903160b806680acf.tar.gz spark-06c0f7628a213a08ef5adeab903160b806680acf.tar.bz2 spark-06c0f7628a213a08ef5adeab903160b806680acf.zip |
use SparseMatrix everywhere
Diffstat (limited to 'mllib/src/main')
3 files changed, 55 insertions, 50 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 31990b0223..a8efdc787e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -26,11 +26,8 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} /** * Class used to obtain singular value decompositions - * @param data Matrix in sparse matrix format - * @param m number of rows - * @param n number of columns */ -class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) { +class SVD { private var k: Int = 1 /** @@ -41,35 +38,11 @@ class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) { this } - /** - * Set matrix to be used for SVD - */ - def setDatadata(data: RDD[MatrixEntry]): this.type = { - this.data = data - this - } - - /** - * Set dimensions of matrix: rows - */ - def setNumRows(m: Int): this.type = { - this.m = m - this - } - - /** - * Set dimensions of matrix: columns - */ - def setNumCols(n: Int): this.type = { - this.n = n - this - } - /** * Compute SVD using the current set parameters */ - def computeSVD() : SVDecomposedMatrix = { - SVD.sparseSVD(data, m, n, k) + def computeSVD(matrix: SparseMatrix) : SVDecomposedMatrix = { + SVD.sparseSVD(matrix, k) } } @@ -103,19 +76,19 @@ object SVD { * All input and output is expected in sparse matrix format, 1-indexed * as tuples of the form ((i,j),value) all in RDDs * - * @param data RDD Matrix in sparse 1-index format ((int, int), value) - * @param m number of rows - * @param n number of columns + * @param matrix sparse matrix to factorize * @param k Recover k singular values and vectors * @return Three sparse matrices: U, S, V such that A = USV^T */ def sparseSVD( - data: RDD[MatrixEntry], - m: Int, - n: Int, + matrix: SparseMatrix, k: Int) : SVDecomposedMatrix = { + val data = matrix.data + val m = matrix.m + val n = matrix.n + if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") } @@ -153,13 +126,16 @@ object SVD { val sc = data.sparkContext // prepare V for returning - val retV = sc.makeRDD( + val retVdata = sc.makeRDD( Array.tabulate(V.rows, sigma.length){ (i,j) => MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten) - - val retS = sc.makeRDD(Array.tabulate(sigma.length){ + val retV = SparseMatrix(retVdata, V.rows, sigma.length) + + val retSdata = sc.makeRDD(Array.tabulate(sigma.length){ x => MatrixEntry(x + 1, x + 1, sigma(x))}) + val retS = SparseMatrix(retSdata, sigma.length, sigma.length) + // Compute U as U = A V S^-1 // turn V S^-1 into an RDD as a sparse matrix val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) @@ -168,10 +144,11 @@ object SVD { // Multiply A by VS^-1 val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) - val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)} - + val retU = SparseMatrix(retUdata, m, sigma.length) + SVDecomposedMatrix(retU, retS, retV) } @@ -195,10 +172,10 @@ object SVD { MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) } - val decomposed = SVD.sparseSVD(data, m, n, k) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V + val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k) + val u = decomposed.U.data + val s = decomposed.S.data + val v = decomposed.V.data println("Computed " + s.toArray.length + " singular values and vectors") u.saveAsTextFile(output_u) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala index e0bcdab2d2..622003576d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.linalg -import org.apache.spark.rdd.RDD - /** * Class that represents the SV decomposition of a matrix * @@ -26,6 +24,6 @@ import org.apache.spark.rdd.RDD * @param S such that A = USV^T * @param V such that A = USV^T */ -case class SVDecomposedMatrix(val U: RDD[MatrixEntry], - val S: RDD[MatrixEntry], - val V: RDD[MatrixEntry]) +case class SVDecomposedMatrix(val U: SparseMatrix, + val S: SparseMatrix, + val V: SparseMatrix) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala new file mode 100644 index 0000000000..cbd1a2a5a4 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD + + +/** + * Class that represents a sparse matrix + * + * @param data RDD of nonzero entries + * @param m number of rows + * @param n numner of columns + */ +case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int) |