aboutsummaryrefslogtreecommitdiff
path: root/mllib/src
diff options
context:
space:
mode:
authorLi Pu <lpu@twitter.com>2014-07-09 12:15:08 -0700
committerXiangrui Meng <meng@databricks.com>2014-07-09 12:15:08 -0700
commit1f33e1f2013c508aa86511750f7bd8437154e51a (patch)
treeefb4e069bd7a20b8467bc0aa24fc91be81cc06a8 /mllib/src
parentd35e3db2325931492b64890125a70579bc3b587b (diff)
downloadspark-1f33e1f2013c508aa86511750f7bd8437154e51a.tar.gz
spark-1f33e1f2013c508aa86511750f7bd8437154e51a.tar.bz2
spark-1f33e1f2013c508aa86511750f7bd8437154e51a.zip
SPARK-1782: svd for sparse matrix using ARPACK
copy ARPACK dsaupd/dseupd code from latest breeze change RowMatrix to use sparse SVD change tests for sparse SVD All tests passed. I will run it against some large matrices. Author: Li Pu <lpu@twitter.com> Author: Xiangrui Meng <meng@databricks.com> Author: Li Pu <li.pu@outlook.com> Closes #964 from vrilleup/master and squashes the following commits: 7312ec1 [Li Pu] very minor comment fix 4c618e9 [Li Pu] Merge pull request #1 from mengxr/vrilleup-master a461082 [Xiangrui Meng] make superscript show up correctly in doc 861ec48 [Xiangrui Meng] simplify axpy 62969fa [Xiangrui Meng] use BDV directly in symmetricEigs change the computation mode to local-svd, local-eigs, and dist-eigs update tests and docs c273771 [Li Pu] automatically determine SVD compute mode and parameters 7148426 [Li Pu] improve RowMatrix multiply 5543cce [Li Pu] improve svd api 819824b [Li Pu] add flag for dense svd or sparse svd eb15100 [Li Pu] fix binary compatibility 4c7aec3 [Li Pu] improve comments e7850ed [Li Pu] use aggregate and axpy 827411b [Li Pu] fix EOF new line 9c80515 [Li Pu] use non-sparse implementation when k = n fe983b0 [Li Pu] improve scala style 96d2ecb [Li Pu] improve eigenvalue sorting e1db950 [Li Pu] SPARK-1782: svd for sparse matrix using ARPACK
Diffstat (limited to 'mllib/src')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala157
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala183
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala59
3 files changed, 339 insertions, 60 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
new file mode 100644
index 0000000000..3515461b52
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
+import com.github.fommil.netlib.ARPACK
+import org.netlib.util.{intW, doubleW}
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Compute eigen-decomposition.
+ */
+@Experimental
+private[mllib] object EigenValueDecomposition {
+ /**
+ * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK.
+ * The caller needs to ensure that the input matrix is real symmetric. This function requires
+ * memory for `n*(4*k+4)` doubles.
+ *
+ * @param mul a function that multiplies the symmetric matrix with a DenseVector.
+ * @param n dimension of the square matrix (maximum Int.MaxValue).
+ * @param k number of leading eigenvalues required, 0 < k < n.
+ * @param tol tolerance of the eigs computation.
+ * @param maxIterations the maximum number of Arnoldi update iterations.
+ * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors
+ * (columns of the matrix).
+ * @note The number of computed eigenvalues might be smaller than k when some Ritz values do not
+ * satisfy the convergence criterion specified by tol (see ARPACK Users Guide, Chapter 4.6
+ * for more details). The maximum number of Arnoldi update iterations is set to 300 in this
+ * function.
+ */
+ private[mllib] def symmetricEigs(
+ mul: BDV[Double] => BDV[Double],
+ n: Int,
+ k: Int,
+ tol: Double,
+ maxIterations: Int): (BDV[Double], BDM[Double]) = {
+ // TODO: remove this function and use eigs in breeze when switching breeze version
+ require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n")
+
+ val arpack = ARPACK.getInstance()
+
+ // tolerance used in stopping criterion
+ val tolW = new doubleW(tol)
+ // number of desired eigenvalues, 0 < nev < n
+ val nev = new intW(k)
+ // nev Lanczos vectors are generated in the first iteration
+ // ncv-nev Lanczos vectors are generated in each subsequent iteration
+ // ncv must be smaller than n
+ val ncv = math.min(2 * k, n)
+
+ // "I" for standard eigenvalue problem, "G" for generalized eigenvalue problem
+ val bmat = "I"
+ // "LM" : compute the NEV largest (in magnitude) eigenvalues
+ val which = "LM"
+
+ var iparam = new Array[Int](11)
+ // use exact shift in each iteration
+ iparam(0) = 1
+ // maximum number of Arnoldi update iterations, or the actual number of iterations on output
+ iparam(2) = maxIterations
+ // Mode 1: A*x = lambda*x, A symmetric
+ iparam(6) = 1
+
+ var ido = new intW(0)
+ var info = new intW(0)
+ var resid = new Array[Double](n)
+ var v = new Array[Double](n * ncv)
+ var workd = new Array[Double](n * 3)
+ var workl = new Array[Double](ncv * (ncv + 8))
+ var ipntr = new Array[Int](11)
+
+ // call ARPACK's reverse communication, first iteration with ido = 0
+ arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd,
+ workl, workl.length, info)
+
+ val w = BDV(workd)
+
+ // ido = 99 : done flag in reverse communication
+ while (ido.`val` != 99) {
+ if (ido.`val` != -1 && ido.`val` != 1) {
+ throw new IllegalStateException("ARPACK returns ido = " + ido.`val` +
+ " This flag is not compatible with Mode 1: A*x = lambda*x, A symmetric.")
+ }
+ // multiply working vector with the matrix
+ val inputOffset = ipntr(0) - 1
+ val outputOffset = ipntr(1) - 1
+ val x = w.slice(inputOffset, inputOffset + n)
+ val y = w.slice(outputOffset, outputOffset + n)
+ y := mul(x)
+ // call ARPACK's reverse communication
+ arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr,
+ workd, workl, workl.length, info)
+ }
+
+ if (info.`val` != 0) {
+ info.`val` match {
+ case 1 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+ " Maximum number of iterations taken. (Refer ARPACK user guide for details)")
+ case 2 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+ " No shifts could be applied. Try to increase NCV. " +
+ "(Refer ARPACK user guide for details)")
+ case _ => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+ " Please refer ARPACK user guide for error message.")
+ }
+ }
+
+ val d = new Array[Double](nev.`val`)
+ val select = new Array[Boolean](ncv)
+ // copy the Ritz vectors
+ val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n)
+
+ // call ARPACK's post-processing for eigenvectors
+ arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n,
+ iparam, ipntr, workd, workl, workl.length, info)
+
+ // number of computed eigenvalues, might be smaller than k
+ val computed = iparam(4)
+
+ val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { r =>
+ (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n))
+ }
+
+ // sort the eigen-pairs in descending order
+ val sortedEigenPairs = eigenPairs.sortBy(- _._1)
+
+ // copy eigenvectors in descending order of eigenvalues
+ val sortedU = BDM.zeros[Double](n, computed)
+ sortedEigenPairs.zipWithIndex.foreach { r =>
+ val b = r._2 * n
+ var i = 0
+ while (i < n) {
+ sortedU.data(b + i) = r._1._2(i)
+ i += 1
+ }
+ }
+
+ (BDV[Double](sortedEigenPairs.map(_._1)), sortedU)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 695e03b736..99cb6516e0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -17,9 +17,10 @@
package org.apache.spark.mllib.linalg.distributed
-import java.util
+import java.util.Arrays
-import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
+import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
+import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}
@@ -34,7 +35,7 @@ import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
* [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
* together with add() and merge() function.
* A numerically stable algorithm is implemented to compute sample mean and variance:
- *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
* Zero elements (including explicit zero values) are skipped when calling add() and merge(),
* to have time complexity O(nnz) instead of O(n) for each column.
*/
@@ -201,6 +202,26 @@ class RowMatrix(
}
/**
+ * Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A`.
+ *
+ * @param v a dense vector whose length must match the number of columns of this matrix
+ * @return a dense vector representing the product
+ */
+ private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = {
+ val n = numCols().toInt
+ val vbr = rows.context.broadcast(v)
+ rows.aggregate(BDV.zeros[Double](n))(
+ seqOp = (U, r) => {
+ val rBrz = r.toBreeze
+ val a = rBrz.dot(vbr.value)
+ brzAxpy(a, rBrz, U.asInstanceOf[BV[Double]])
+ U
+ },
+ combOp = (U1, U2) => U1 += U2
+ )
+ }
+
+ /**
* Computes the Gramian matrix `A^T A`.
*/
def computeGramianMatrix(): Matrix = {
@@ -220,50 +241,135 @@ class RowMatrix(
}
/**
- * Computes the singular value decomposition of this matrix.
- * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
+ * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This
+ * will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k
+ * singular values, U and V contain the corresponding singular vectors.
*
- * There is no restriction on m, but we require `n^2` doubles to fit in memory.
- * Further, n should be less than m.
-
- * The decomposition is computed by first computing A'A = V S^2 V',
- * computing svd locally on that (since n x n is small), from which we recover S and V.
- * Then we compute U via easy matrix multiplication as U = A * (V * S^-1).
- * Note that this approach requires `O(n^3)` time on the master node.
+ * At most k largest non-zero singular values and associated vectors are returned. If there are k
+ * such values, then the dimensions of the return will be:
+ * - U is a RowMatrix of size m x k that satisfies U' * U = eye(k),
+ * - s is a Vector of size k, holding the singular values in descending order,
+ * - V is a Matrix of size n x k that satisfies V' * V = eye(k).
+ *
+ * We assume n is smaller than m. The singular values and the right singular vectors are derived
+ * from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix
+ * storing the right singular vectors, is computed via matrix multiplication as
+ * U = A * (V * S^-1^), if requested by user. The actual method to use is determined
+ * automatically based on the cost:
+ * - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian
+ * matrix first and then compute its top eigenvalues and eigenvectors locally on the driver.
+ * This requires a single pass with O(n^2^) storage on each executor and on the driver, and
+ * O(n^2^ k) time on the driver.
+ * - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to
+ * compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k)
+ * passes, O(n) storage on each executor, and O(n k) storage on the driver.
*
- * At most k largest non-zero singular values and associated vectors are returned.
- * If there are k such values, then the dimensions of the return will be:
+ * Several internal parameters are set to default values. The reciprocal condition number rCond
+ * is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where
+ * sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for
+ * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's
+ * eigen-decomposition is set to 1e-10.
*
- * U is a RowMatrix of size m x k that satisfies U'U = eye(k),
- * s is a Vector of size k, holding the singular values in descending order,
- * and V is a Matrix of size n x k that satisfies V'V = eye(k).
+ * @note The conditions that decide which method to use internally and the default parameters are
+ * subject to change.
*
- * @param k number of singular values to keep. We might return less than k if there are
- * numerically zero singular values. See rCond.
+ * @param k number of leading singular values to keep (0 < k <= n). It might return less than k if
+ * there are numerically zero singular values or there are not enough Ritz values
+ * converged before the maximum number of Arnoldi update iterations is reached (in case
+ * that matrix A is ill-conditioned).
* @param computeU whether to compute U
* @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
* are treated as zero, where sigma(0) is the largest singular value.
- * @return SingularValueDecomposition(U, s, V)
+ * @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
*/
def computeSVD(
k: Int,
computeU: Boolean = false,
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
+ // maximum number of Arnoldi update iterations for invoking ARPACK
+ val maxIter = math.max(300, k * 3)
+ // numerical tolerance for invoking ARPACK
+ val tol = 1e-10
+ computeSVD(k, computeU, rCond, maxIter, tol, "auto")
+ }
+
+ /**
+ * The actual SVD implementation, visible for testing.
+ *
+ * @param k number of leading singular values to keep (0 < k <= n)
+ * @param computeU whether to compute U
+ * @param rCond the reciprocal condition number
+ * @param maxIter max number of iterations (if ARPACK is used)
+ * @param tol termination tolerance (if ARPACK is used)
+ * @param mode computation mode (auto: determine automatically which mode to use,
+ * local-svd: compute gram matrix and computes its full SVD locally,
+ * local-eigs: compute gram matrix and computes its top eigenvalues locally,
+ * dist-eigs: compute the top eigenvalues of the gram matrix distributively)
+ * @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
+ */
+ private[mllib] def computeSVD(
+ k: Int,
+ computeU: Boolean,
+ rCond: Double,
+ maxIter: Int,
+ tol: Double,
+ mode: String): SingularValueDecomposition[RowMatrix, Matrix] = {
val n = numCols().toInt
- require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
+ require(k > 0 && k <= n, s"Request up to n singular values but got k=$k and n=$n.")
- val G = computeGramianMatrix()
+ object SVDMode extends Enumeration {
+ val LocalARPACK, LocalLAPACK, DistARPACK = Value
+ }
+
+ val computeMode = mode match {
+ case "auto" =>
+ // TODO: The conditions below are not fully tested.
+ if (n < 100 || k > n / 2) {
+ // If n is small or k is large compared with n, we better compute the Gramian matrix first
+ // and then compute its eigenvalues locally, instead of making multiple passes.
+ if (k < n / 3) {
+ SVDMode.LocalARPACK
+ } else {
+ SVDMode.LocalLAPACK
+ }
+ } else {
+ // If k is small compared with n, we use ARPACK with distributed multiplication.
+ SVDMode.DistARPACK
+ }
+ case "local-svd" => SVDMode.LocalLAPACK
+ case "local-eigs" => SVDMode.LocalARPACK
+ case "dist-eigs" => SVDMode.DistARPACK
+ case _ => throw new IllegalArgumentException(s"Do not support mode $mode.")
+ }
+
+ // Compute the eigen-decomposition of A' * A.
+ val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match {
+ case SVDMode.LocalARPACK =>
+ require(k < n, s"k must be smaller than n in local-eigs mode but got k=$k and n=$n.")
+ val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+ EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter)
+ case SVDMode.LocalLAPACK =>
+ val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+ val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G)
+ (sigmaSquaresFull, uFull)
+ case SVDMode.DistARPACK =>
+ require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.")
+ EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
+ }
- // TODO: Use sparse SVD instead.
- val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) =
- brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])
val sigmas: BDV[Double] = brzSqrt(sigmaSquares)
- // Determine effective rank.
+ // Determine the effective rank.
val sigma0 = sigmas(0)
val threshold = rCond * sigma0
var i = 0
- while (i < k && sigmas(i) >= threshold) {
+ // sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence
+ // criterion specified by tol after max number of iterations.
+ // Thus use i < min(k, sigmas.length) instead of i < k.
+ if (sigmas.length < k) {
+ logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.")
+ }
+ while (i < math.min(k, sigmas.length) && sigmas(i) >= threshold) {
i += 1
}
val sk = i
@@ -272,12 +378,12 @@ class RowMatrix(
logWarning(s"Requested $k singular values but only found $sk nonzeros.")
}
- val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk))
- val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+ val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk))
+ val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
if (computeU) {
// N = Vk * Sk^{-1}
- val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+ val N = new BDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
var i = 0
var j = 0
while (j < sk) {
@@ -364,7 +470,7 @@ class RowMatrix(
if (k == n) {
Matrices.dense(n, k, u.data)
} else {
- Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k))
+ Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k))
}
}
@@ -390,15 +496,24 @@ class RowMatrix(
*/
def multiply(B: Matrix): RowMatrix = {
val n = numCols().toInt
+ val k = B.numCols
require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}")
require(B.isInstanceOf[DenseMatrix],
s"Only support dense matrix at this time but found ${B.getClass.getName}.")
- val Bb = rows.context.broadcast(B)
+ val Bb = rows.context.broadcast(B.toBreeze.asInstanceOf[BDM[Double]].toDenseVector.toArray)
val AB = rows.mapPartitions({ iter =>
- val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]]
- iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze))
+ val Bi = Bb.value
+ iter.map(row => {
+ val v = BDV.zeros[Double](k)
+ var i = 0
+ while (i < k) {
+ v(i) = row.toBreeze.dot(new BDV(Bi, i * n, 1, n))
+ i += 1
+ }
+ Vectors.fromBreeze(v)
+ })
}, preservesPartitioning = true)
new RowMatrix(AB, nRows, B.numCols)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index c9f9acf4c1..a961f89456 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -96,37 +96,44 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
test("svd of a full-rank matrix") {
for (mat <- Seq(denseMat, sparseMat)) {
- val localMat = mat.toBreeze()
- val (localU, localSigma, localVt) = brzSvd(localMat)
- val localV: BDM[Double] = localVt.t.toDenseMatrix
- for (k <- 1 to n) {
- val svd = mat.computeSVD(k, computeU = true)
- val U = svd.U
- val s = svd.s
- val V = svd.V
- assert(U.numRows() === m)
- assert(U.numCols() === k)
- assert(s.size === k)
- assert(V.numRows === n)
- assert(V.numCols === k)
- assertColumnEqualUpToSign(U.toBreeze(), localU, k)
- assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
- assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+ for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
+ val localMat = mat.toBreeze()
+ val (localU, localSigma, localVt) = brzSvd(localMat)
+ val localV: BDM[Double] = localVt.t.toDenseMatrix
+ for (k <- 1 to n) {
+ val skip = (mode == "local-eigs" || mode == "dist-eigs") && k == n
+ if (!skip) {
+ val svd = mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode)
+ val U = svd.U
+ val s = svd.s
+ val V = svd.V
+ assert(U.numRows() === m)
+ assert(U.numCols() === k)
+ assert(s.size === k)
+ assert(V.numRows === n)
+ assert(V.numCols === k)
+ assertColumnEqualUpToSign(U.toBreeze(), localU, k)
+ assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
+ assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+ }
+ }
+ val svdWithoutU = mat.computeSVD(1, computeU = false, 1e-9, 300, 1e-10, mode)
+ assert(svdWithoutU.U === null)
}
- val svdWithoutU = mat.computeSVD(n)
- assert(svdWithoutU.U === null)
}
}
test("svd of a low-rank matrix") {
- val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2)
- val mat = new RowMatrix(rows, 4, 2)
- val svd = mat.computeSVD(2, computeU = true)
- assert(svd.s.size === 1, "should not return zero singular values")
- assert(svd.U.numRows() === 4)
- assert(svd.U.numCols() === 1)
- assert(svd.V.numRows === 2)
- assert(svd.V.numCols === 1)
+ val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
+ val mat = new RowMatrix(rows, 4, 3)
+ for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
+ val svd = mat.computeSVD(2, computeU = true, 1e-6, 300, 1e-10, mode)
+ assert(svd.s.size === 1, s"should not return zero singular values but got ${svd.s}")
+ assert(svd.U.numRows() === 4)
+ assert(svd.U.numCols() === 1)
+ assert(svd.V.numRows === 3)
+ assert(svd.V.numCols === 1)
+ }
}
def closeToZero(G: BDM[Double]): Boolean = {