aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorReza Zadeh <rizlar@gmail.com>2014-01-04 14:28:07 -0800
committerReza Zadeh <rizlar@gmail.com>2014-01-04 14:28:07 -0800
commit06c0f7628a213a08ef5adeab903160b806680acf (patch)
tree3343a151864808dabbc15ba481f125e6723ab55e /mllib
parentcdff9fc858b9b83eb1119ec2a6d1d3c9a66f47a9 (diff)
downloadspark-06c0f7628a213a08ef5adeab903160b806680acf.tar.gz
spark-06c0f7628a213a08ef5adeab903160b806680acf.tar.bz2
spark-06c0f7628a213a08ef5adeab903160b806680acf.zip
use SparseMatrix everywhere
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala67
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala30
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala50
4 files changed, 84 insertions, 71 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
index 31990b0223..a8efdc787e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -26,11 +26,8 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
/**
* Class used to obtain singular value decompositions
- * @param data Matrix in sparse matrix format
- * @param m number of rows
- * @param n number of columns
*/
-class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) {
+class SVD {
private var k: Int = 1
/**
@@ -41,35 +38,11 @@ class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) {
this
}
- /**
- * Set matrix to be used for SVD
- */
- def setDatadata(data: RDD[MatrixEntry]): this.type = {
- this.data = data
- this
- }
-
- /**
- * Set dimensions of matrix: rows
- */
- def setNumRows(m: Int): this.type = {
- this.m = m
- this
- }
-
- /**
- * Set dimensions of matrix: columns
- */
- def setNumCols(n: Int): this.type = {
- this.n = n
- this
- }
-
/**
* Compute SVD using the current set parameters
*/
- def computeSVD() : SVDecomposedMatrix = {
- SVD.sparseSVD(data, m, n, k)
+ def computeSVD(matrix: SparseMatrix) : SVDecomposedMatrix = {
+ SVD.sparseSVD(matrix, k)
}
}
@@ -103,19 +76,19 @@ object SVD {
* All input and output is expected in sparse matrix format, 1-indexed
* as tuples of the form ((i,j),value) all in RDDs
*
- * @param data RDD Matrix in sparse 1-index format ((int, int), value)
- * @param m number of rows
- * @param n number of columns
+ * @param matrix sparse matrix to factorize
* @param k Recover k singular values and vectors
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
def sparseSVD(
- data: RDD[MatrixEntry],
- m: Int,
- n: Int,
+ matrix: SparseMatrix,
k: Int)
: SVDecomposedMatrix =
{
+ val data = matrix.data
+ val m = matrix.m
+ val n = matrix.n
+
if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix")
}
@@ -153,13 +126,16 @@ object SVD {
val sc = data.sparkContext
// prepare V for returning
- val retV = sc.makeRDD(
+ val retVdata = sc.makeRDD(
Array.tabulate(V.rows, sigma.length){ (i,j) =>
MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten)
-
- val retS = sc.makeRDD(Array.tabulate(sigma.length){
+ val retV = SparseMatrix(retVdata, V.rows, sigma.length)
+
+ val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
x => MatrixEntry(x + 1, x + 1, sigma(x))})
+ val retS = SparseMatrix(retSdata, sigma.length, sigma.length)
+
// Compute U as U = A V S^-1
// turn V S^-1 into an RDD as a sparse matrix
val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
@@ -168,10 +144,11 @@ object SVD {
// Multiply A by VS^-1
val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
- val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
+ val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
=> ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
.map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}
-
+ val retU = SparseMatrix(retUdata, m, sigma.length)
+
SVDecomposedMatrix(retU, retS, retV)
}
@@ -195,10 +172,10 @@ object SVD {
MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
}
- val decomposed = SVD.sparseSVD(data, m, n, k)
- val u = decomposed.U
- val s = decomposed.S
- val v = decomposed.V
+ val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
+ val u = decomposed.U.data
+ val s = decomposed.S.data
+ val v = decomposed.V.data
println("Computed " + s.toArray.length + " singular values and vectors")
u.saveAsTextFile(output_u)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala
index e0bcdab2d2..622003576d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala
@@ -17,8 +17,6 @@
package org.apache.spark.mllib.linalg
-import org.apache.spark.rdd.RDD
-
/**
* Class that represents the SV decomposition of a matrix
*
@@ -26,6 +24,6 @@ import org.apache.spark.rdd.RDD
* @param S such that A = USV^T
* @param V such that A = USV^T
*/
-case class SVDecomposedMatrix(val U: RDD[MatrixEntry],
- val S: RDD[MatrixEntry],
- val V: RDD[MatrixEntry])
+case class SVDecomposedMatrix(val U: SparseMatrix,
+ val S: SparseMatrix,
+ val V: SparseMatrix)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
new file mode 100644
index 0000000000..cbd1a2a5a4
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Class that represents a sparse matrix
+ *
+ * @param data RDD of nonzero entries
+ * @param m number of rows
+ * @param n numner of columns
+ */
+case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
index 4126e819e3..f239e8505f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
@@ -45,9 +45,12 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val EPSILON = 1e-4
// Return jblas matrix from sparse matrix RDD
- def getDenseMatrix(matrix:RDD[MatrixEntry], m:Int, n:Int) : DoubleMatrix = {
+ def getDenseMatrix(matrix:SparseMatrix) : DoubleMatrix = {
+ val data = matrix.data
+ val m = matrix.m
+ val n = matrix.n
val ret = DoubleMatrix.zeros(m, n)
- matrix.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval))
+ matrix.data.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval))
ret
}
@@ -67,24 +70,26 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten )
- val decomposed = SVD.sparseSVD(data, m, n, n)
+ val a = SparseMatrix(data, m, n)
+
+ val decomposed = SVD.sparseSVD(a, n)
val u = decomposed.U
val v = decomposed.V
- val s = decomposed.S
+ val s = decomposed.S
- val densea = getDenseMatrix(data, m, n)
+ val densea = getDenseMatrix(a)
val svd = Singular.sparseSVD(densea)
- val retu = getDenseMatrix(u, m, n)
- val rets = getDenseMatrix(s, n, n)
- val retv = getDenseMatrix(v, n, n)
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
// check individual decomposition
assertMatrixEquals(retu, svd(0))
assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
assertMatrixEquals(retv, svd(2))
- // check multiplication guarantee
+ // check multiplication guarantee
assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
}
@@ -95,20 +100,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
MatrixEntry(a + 1, b + 1, 1.0) }.flatten )
val k = 1
- val decomposed = SVD.sparseSVD(data, m, n, k)
+ val a = SparseMatrix(data, m, n)
+
+ val decomposed = SVD.sparseSVD(a, k)
val u = decomposed.U
val s = decomposed.S
val v = decomposed.V
- val retrank = s.toArray.length
+ val retrank = s.data.toArray.length
assert(retrank == 1, "rank returned not one")
- val densea = getDenseMatrix(data, m, n)
+ val densea = getDenseMatrix(a)
val svd = Singular.sparseSVD(densea)
- val retu = getDenseMatrix(u, m, retrank)
- val rets = getDenseMatrix(s, retrank, retrank)
- val retv = getDenseMatrix(v, n, retrank)
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
// check individual decomposition
assertMatrixEquals(retu, svd(0).getColumn(0))
@@ -124,21 +131,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val n = 3
val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten )
+ val a = SparseMatrix(data, m, n)
val k = 1 // only one svalue above this
- val decomposed = SVD.sparseSVD(data, m, n, k)
+ val decomposed = SVD.sparseSVD(a, k)
val u = decomposed.U
val s = decomposed.S
val v = decomposed.V
- val retrank = s.toArray.length
+ val retrank = s.data.toArray.length
- val densea = getDenseMatrix(data, m, n)
+ val densea = getDenseMatrix(a)
val svd = Singular.sparseSVD(densea)
- val retu = getDenseMatrix(u, m, retrank)
- val rets = getDenseMatrix(s, retrank, retrank)
- val retv = getDenseMatrix(v, n, retrank)
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
assert(retrank == 1, "rank returned not one")