From 9689b663a2a4947ad60795321c770052f3c637f1 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 23:01:15 -0700 Subject: [SPARK-1390] Refactoring of matrices backed by RDDs This is to refactor interfaces for matrices backed by RDDs. It would be better if we have a clear separation of local matrices and those backed by RDDs. Right now, we have 1. `org.apache.spark.mllib.linalg.SparseMatrix`, which is a wrapper over an RDD of matrix entries, i.e., coordinate list format. 2. `org.apache.spark.mllib.linalg.TallSkinnyDenseMatrix`, which is a wrapper over RDD[Array[Double]], i.e. row-oriented format. We will see naming collision when we introduce local `SparseMatrix`, and the name `TallSkinnyDenseMatrix` is not exact if we switch to `RDD[Vector]` from `RDD[Array[Double]]`. It would be better to have "RDD" in the class name to suggest that operations may trigger jobs. The proposed names are (all under `org.apache.spark.mllib.linalg.rdd`): 1. `RDDMatrix`: trait for matrices backed by one or more RDDs 2. `CoordinateRDDMatrix`: wrapper of `RDD[(Long, Long, Double)]` 3. `RowRDDMatrix`: wrapper of `RDD[Vector]` whose rows do not have special ordering 4. `IndexedRowRDDMatrix`: wrapper of `RDD[(Long, Vector)]` whose rows are associated with indices The current code also introduces local matrices. Author: Xiangrui Meng Closes #296 from mengxr/mat and squashes the following commits: 24d8294 [Xiangrui Meng] fix for groupBy returning Iterable bfc2b26 [Xiangrui Meng] merge master 8e4f1f5 [Xiangrui Meng] Merge branch 'master' into mat 0135193 [Xiangrui Meng] address Reza's comments 03cd7e1 [Xiangrui Meng] add pca/gram to IndexedRowMatrix add toBreeze to DistributedMatrix for test simplify tests b177ff1 [Xiangrui Meng] address Matei's comments be119fe [Xiangrui Meng] rename m/n to numRows/numCols for local matrix add tests for matrices b881506 [Xiangrui Meng] rename SparkPCA/SVD to TallSkinnyPCA/SVD e7d0d4a [Xiangrui Meng] move IndexedRDDMatrixRow to IndexedRowRDDMatrix 0d1491c [Xiangrui Meng] fix test errors a85262a [Xiangrui Meng] rename RDDMatrixRow to IndexedRDDMatrixRow b8b6ac3 [Xiangrui Meng] Remove old code 4cf679c [Xiangrui Meng] port pca to RowRDDMatrix, and add multiply and covariance 7836e2f [Xiangrui Meng] initial refactoring of matrices backed by RDDs --- .../org/apache/spark/mllib/linalg/Matrices.scala | 101 ++++++ .../apache/spark/mllib/linalg/MatrixEntry.scala | 27 -- .../org/apache/spark/mllib/linalg/MatrixRow.scala | 26 -- .../org/apache/spark/mllib/linalg/MatrixSVD.scala | 29 -- .../scala/org/apache/spark/mllib/linalg/PCA.scala | 120 ------- .../scala/org/apache/spark/mllib/linalg/SVD.scala | 395 --------------------- .../mllib/linalg/SingularValueDecomposition.scala | 21 ++ .../apache/spark/mllib/linalg/SparseMatrix.scala | 30 -- .../spark/mllib/linalg/TallSkinnyDenseMatrix.scala | 30 -- .../spark/mllib/linalg/TallSkinnyMatrixSVD.scala | 31 -- .../linalg/distributed/CoordinateMatrix.scala | 112 ++++++ .../linalg/distributed/DistributedMatrix.scala | 37 ++ .../linalg/distributed/IndexedRowMatrix.scala | 148 ++++++++ .../spark/mllib/linalg/distributed/RowMatrix.scala | 344 ++++++++++++++++++ .../org/apache/spark/mllib/util/LAUtils.scala | 67 ---- .../mllib/linalg/BreezeMatrixConversionSuite.scala | 40 +++ .../apache/spark/mllib/linalg/MatricesSuite.scala | 39 ++ .../org/apache/spark/mllib/linalg/PCASuite.scala | 124 ------- .../org/apache/spark/mllib/linalg/SVDSuite.scala | 194 ---------- .../linalg/distributed/CoordinateMatrixSuite.scala | 98 +++++ .../linalg/distributed/IndexedRowMatrixSuite.scala | 120 +++++++ .../mllib/linalg/distributed/RowMatrixSuite.scala | 173 +++++++++ 22 files changed, 1233 insertions(+), 1073 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala new file mode 100644 index 0000000000..b11ba5d30f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import breeze.linalg.{Matrix => BM, DenseMatrix => BDM} + +/** + * Trait for a local matrix. + */ +trait Matrix extends Serializable { + + /** Number of rows. */ + def numRows: Int + + /** Number of columns. */ + def numCols: Int + + /** Converts to a dense array in column major. */ + def toArray: Array[Double] + + /** Converts to a breeze matrix. */ + private[mllib] def toBreeze: BM[Double] + + /** Gets the (i, j)-th element. */ + private[mllib] def apply(i: Int, j: Int): Double = toBreeze(i, j) + + override def toString: String = toBreeze.toString() +} + +/** + * Column-majored dense matrix. + * The entry values are stored in a single array of doubles with columns listed in sequence. + * For example, the following matrix + * {{{ + * 1.0 2.0 + * 3.0 4.0 + * 5.0 6.0 + * }}} + * is stored as `[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]`. + * + * @param numRows number of rows + * @param numCols number of columns + * @param values matrix entries in column major + */ +class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix { + + require(values.length == numRows * numCols) + + override def toArray: Array[Double] = values + + private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values) +} + +/** + * Factory methods for [[org.apache.spark.mllib.linalg.Matrix]]. + */ +object Matrices { + + /** + * Creates a column-majored dense matrix. + * + * @param numRows number of rows + * @param numCols number of columns + * @param values matrix entries in column major + */ + def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix = { + new DenseMatrix(numRows, numCols, values) + } + + /** + * Creates a Matrix instance from a breeze matrix. + * @param breeze a breeze matrix + * @return a Matrix instance + */ + private[mllib] def fromBreeze(breeze: BM[Double]): Matrix = { + breeze match { + case dm: BDM[Double] => + require(dm.majorStride == dm.rows, + "Do not support stride size different from the number of rows.") + new DenseMatrix(dm.rows, dm.cols, dm.data) + case _ => + throw new UnsupportedOperationException( + s"Do not support conversion from type ${breeze.getClass.getName}.") + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala deleted file mode 100644 index 416996fcbe..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -/** - * Class that represents an entry in a sparse matrix of doubles. - * - * @param i row index (0 indexing used) - * @param j column index (0 indexing used) - * @param mval value of entry in matrix - */ -case class MatrixEntry(val i: Int, val j: Int, val mval: Double) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala deleted file mode 100644 index 2608a67bfe..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -/** - * Class that represents a row of a dense matrix - * - * @param i row index (0 indexing used) - * @param data entries of the row - */ -case class MatrixRow(val i: Int, val data: Array[Double]) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala deleted file mode 100644 index 319f82b449..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -/** - * Class that represents the SV decomposition of a matrix - * - * @param U such that A = USV^T - * @param S such that A = USV^T - * @param V such that A = USV^T - */ -case class MatrixSVD(val U: SparseMatrix, - val S: SparseMatrix, - val V: SparseMatrix) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala deleted file mode 100644 index fe5b3f6c7e..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -import org.apache.spark.rdd.RDD - - -import org.jblas.DoubleMatrix - - -/** - * Class used to obtain principal components - */ -class PCA { - private var k = 1 - - /** - * Set the number of top-k principle components to return - */ - def setK(k: Int): PCA = { - this.k = k - this - } - - /** - * Compute PCA using the current set parameters - */ - def compute(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = { - computePCA(matrix) - } - - /** - * Compute PCA using the parameters currently set - * See computePCA() for more details - */ - def compute(matrix: RDD[Array[Double]]): Array[Array[Double]] = { - computePCA(matrix) - } - - /** - * Computes the top k principal component coefficients for the m-by-n data matrix X. - * Rows of X correspond to observations and columns correspond to variables. - * The coefficient matrix is n-by-k. Each column of coeff contains coefficients - * for one principal component, and the columns are in descending - * order of component variance. - * This function centers the data and uses the - * singular value decomposition (SVD) algorithm. - * - * @param matrix dense matrix to perform PCA on - * @return An nxk matrix with principal components in columns. Columns are inner arrays - */ - private def computePCA(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = { - val m = matrix.m - val n = matrix.n - - if (m <= 0 || n <= 0) { - throw new IllegalArgumentException("Expecting a well-formed matrix: m=$m n=$n") - } - - computePCA(matrix.rows.map(_.data)) - } - - /** - * Computes the top k principal component coefficients for the m-by-n data matrix X. - * Rows of X correspond to observations and columns correspond to variables. - * The coefficient matrix is n-by-k. Each column of coeff contains coefficients - * for one principal component, and the columns are in descending - * order of component variance. - * This function centers the data and uses the - * singular value decomposition (SVD) algorithm. - * - * @param matrix dense matrix to perform pca on - * @return An nxk matrix of principal components - */ - private def computePCA(matrix: RDD[Array[Double]]): Array[Array[Double]] = { - val n = matrix.first.size - - // compute column sums and normalize matrix - val colSumsTemp = matrix.map((_, 1)).fold((Array.ofDim[Double](n), 0)) { - (a, b) => - val am = new DoubleMatrix(a._1) - val bm = new DoubleMatrix(b._1) - am.addi(bm) - (a._1, a._2 + b._2) - } - - val m = colSumsTemp._2 - val colSums = colSumsTemp._1.map(x => x / m) - - val data = matrix.map { - x => - val row = Array.ofDim[Double](n) - var i = 0 - while (i < n) { - row(i) = x(i) - colSums(i) - i += 1 - } - row - } - - val (u, s, v) = new SVD().setK(k).compute(data) - v - } -} - diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala deleted file mode 100644 index 0d97b7d92f..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} - -/** - * Class used to obtain singular value decompositions - */ -class SVD { - private var k = 1 - private var computeU = true - - // All singular values smaller than rCond * sigma(0) - // are treated as zero, where sigma(0) is the largest singular value. - private var rCond = 1e-9 - - /** - * Set the number of top-k singular vectors to return - */ - def setK(k: Int): SVD = { - this.k = k - this - } - - /** - * Sets the reciprocal condition number (rCond). All singular values - * smaller than rCond * sigma(0) are treated as zero, - * where sigma(0) is the largest singular value. - */ - def setReciprocalConditionNumber(smallS: Double): SVD = { - this.rCond = smallS - this - } - - /** - * Should U be computed? - */ - def setComputeU(compU: Boolean): SVD = { - this.computeU = compU - this - } - - /** - * Compute SVD using the current set parameters - */ - def compute(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = { - denseSVD(matrix) - } - - /** - * Compute SVD using the current set parameters - * Returns (U, S, V) such that A = USV^T - * U is a row-by-row dense matrix - * S is a simple double array of singular values - * V is a 2d array matrix - * See [[denseSVD]] for more documentation - */ - def compute(matrix: RDD[Array[Double]]): - (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = { - denseSVD(matrix) - } - - /** - * See full paramter definition of sparseSVD for more description. - * - * @param matrix sparse matrix to factorize - * @return Three sparse matrices: U, S, V such that A = USV^T - */ - def compute(matrix: SparseMatrix): MatrixSVD = { - sparseSVD(matrix) - } - - /** - * Singular Value Decomposition for Tall and Skinny matrices. - * Given an m x n matrix A, this will compute matrices U, S, V such that - * A = U * S * V' - * - * There is no restriction on m, but we require n^2 doubles to fit in memory. - * Further, n should be less than m. - * - * The decomposition is computed by first computing A'A = V S^2 V', - * computing svd locally on that (since n x n is small), - * from which we recover S and V. - * Then we compute U via easy matrix multiplication - * as U = A * V * S^-1 - * - * Only the k largest singular values and associated vectors are found. - * If there are k such values, then the dimensions of the return will be: - * - * S is k x k and diagonal, holding the singular values on diagonal - * U is m x k and satisfies U'U = eye(k) - * V is n x k and satisfies V'V = eye(k) - * - * @param matrix dense matrix to factorize - * @return See [[TallSkinnyMatrixSVD]] for the output matrices and arrays - */ - private def denseSVD(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = { - val m = matrix.m - val n = matrix.n - - if (m < n || m <= 0 || n <= 0) { - throw new IllegalArgumentException("Expecting a tall and skinny matrix m=$m n=$n") - } - - if (k < 1 || k > n) { - throw new IllegalArgumentException("Request up to n singular values n=$n k=$k") - } - - val rowIndices = matrix.rows.map(_.i) - - // compute SVD - val (u, sigma, v) = denseSVD(matrix.rows.map(_.data)) - - if (computeU) { - // prep u for returning - val retU = TallSkinnyDenseMatrix( - u.zip(rowIndices).map { - case (row, i) => MatrixRow(i, row) - }, - m, - k) - - TallSkinnyMatrixSVD(retU, sigma, v) - } else { - TallSkinnyMatrixSVD(null, sigma, v) - } - } - - /** - * Singular Value Decomposition for Tall and Skinny matrices. - * Given an m x n matrix A, this will compute matrices U, S, V such that - * A = U * S * V' - * - * There is no restriction on m, but we require n^2 doubles to fit in memory. - * Further, n should be less than m. - * - * The decomposition is computed by first computing A'A = V S^2 V', - * computing svd locally on that (since n x n is small), - * from which we recover S and V. - * Then we compute U via easy matrix multiplication - * as U = A * V * S^-1 - * - * Only the k largest singular values and associated vectors are found. - * If there are k such values, then the dimensions of the return will be: - * - * S is k x k and diagonal, holding the singular values on diagonal - * U is m x k and satisfies U'U = eye(k) - * V is n x k and satisfies V'V = eye(k) - * - * The return values are as lean as possible: an RDD of rows for U, - * a simple array for sigma, and a dense 2d matrix array for V - * - * @param matrix dense matrix to factorize - * @return Three matrices: U, S, V such that A = USV^T - */ - private def denseSVD(matrix: RDD[Array[Double]]): - (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = { - val n = matrix.first.size - - if (k < 1 || k > n) { - throw new IllegalArgumentException( - "Request up to n singular values k=$k n=$n") - } - - // Compute A^T A - val fullata = matrix.mapPartitions { - iter => - val localATA = Array.ofDim[Double](n, n) - while (iter.hasNext) { - val row = iter.next() - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - localATA(i)(j) += row(i) * row(j) - j += 1 - } - i += 1 - } - } - Iterator(localATA) - }.fold(Array.ofDim[Double](n, n)) { - (a, b) => - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - a(i)(j) += b(i)(j) - j += 1 - } - i += 1 - } - a - } - - // Construct jblas A^T A locally - val ata = new DoubleMatrix(fullata) - - // Since A^T A is small, we can compute its SVD directly - val svd = Singular.sparseSVD(ata) - val V = svd(0) - val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x / svd(1).get(0) > rCond) - - val sk = Math.min(k, sigmas.size) - val sigma = sigmas.take(sk) - - // prepare V for returning - val retV = Array.tabulate(n, sk)((i, j) => V.get(i, j)) - - if (computeU) { - // Compute U as U = A V S^-1 - // Compute VS^-1 - val vsinv = new DoubleMatrix(Array.tabulate(n, sk)((i, j) => V.get(i, j) / sigma(j))) - val retU = matrix.map { - x => - val v = new DoubleMatrix(Array(x)) - v.mmul(vsinv).data - } - (retU, sigma, retV) - } else { - (null, sigma, retV) - } - } - - /** - * Singular Value Decomposition for Tall and Skinny sparse matrices. - * Given an m x n matrix A, this will compute matrices U, S, V such that - * A = U * S * V' - * - * There is no restriction on m, but we require O(n^2) doubles to fit in memory. - * Further, n should be less than m. - * - * The decomposition is computed by first computing A'A = V S^2 V', - * computing svd locally on that (since n x n is small), - * from which we recover S and V. - * Then we compute U via easy matrix multiplication - * as U = A * V * S^-1 - * - * Only the k largest singular values and associated vectors are found. - * If there are k such values, then the dimensions of the return will be: - * - * S is k x k and diagonal, holding the singular values on diagonal - * U is m x k and satisfies U'U = eye(k) - * V is n x k and satisfies V'V = eye(k) - * - * All input and output is expected in sparse matrix format, 0-indexed - * as tuples of the form ((i,j),value) all in RDDs using the - * SparseMatrix class - * - * @param matrix sparse matrix to factorize - * @return Three sparse matrices: U, S, V such that A = USV^T - */ - private def sparseSVD(matrix: SparseMatrix): MatrixSVD = { - val data = matrix.data - val m = matrix.m - val n = matrix.n - - if (m < n || m <= 0 || n <= 0) { - throw new IllegalArgumentException("Expecting a tall and skinny matrix") - } - - if (k < 1 || k > n) { - throw new IllegalArgumentException("Must request up to n singular values") - } - - // Compute A^T A, assuming rows are sparse enough to fit in memory - val rows = data.map(entry => - (entry.i, (entry.j, entry.mval))).groupByKey() - val emits = rows.flatMap { - case (rowind, cols) => - cols.flatMap { - case (colind1, mval1) => - cols.map { - case (colind2, mval2) => - ((colind1, colind2), mval1 * mval2) - } - } - }.reduceByKey(_ + _) - - // Construct jblas A^T A locally - val ata = DoubleMatrix.zeros(n, n) - for (entry <- emits.collect()) { - ata.put(entry._1._1, entry._1._2, entry._2) - } - - // Since A^T A is small, we can compute its SVD directly - val svd = Singular.sparseSVD(ata) - val V = svd(0) - // This will be updated to rcond - val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9) - - if (sigmas.size < k) { - throw new Exception("Not enough singular values to return k=" + k + " s=" + sigmas.size) - } - - val sigma = sigmas.take(k) - - val sc = data.sparkContext - - // prepare V for returning - val retVdata = sc.makeRDD( - Array.tabulate(V.rows, sigma.length) { - (i, j) => - MatrixEntry(i, j, V.get(i, j)) - }.flatten) - val retV = SparseMatrix(retVdata, V.rows, sigma.length) - - val retSdata = sc.makeRDD(Array.tabulate(sigma.length) { - x => MatrixEntry(x, x, sigma(x)) - }) - - val retS = SparseMatrix(retSdata, sigma.length, sigma.length) - - // Compute U as U = A V S^-1 - // turn V S^-1 into an RDD as a sparse matrix - val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) { - (i, j) => ((i, j), V.get(i, j) / sigma(j)) - }.flatten) - - if (computeU) { - // Multiply A by VS^-1 - val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) - val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) - val retUdata = aCols.join(bRows).map { - case (key, ((rowInd, rowVal), (colInd, colVal))) => - ((rowInd, colInd), rowVal * colVal) - }.reduceByKey(_ + _).map { - case ((row, col), mval) => MatrixEntry(row, col, mval) - } - - val retU = SparseMatrix(retUdata, m, sigma.length) - MatrixSVD(retU, retS, retV) - } else { - MatrixSVD(null, retS, retV) - } - } -} - -/** - * Top-level methods for calling sparse Singular Value Decomposition - * NOTE: All matrices are 0-indexed - */ -object SVD { - def main(args: Array[String]) { - if (args.length < 8) { - println("Usage: SVD " + - " ") - System.exit(1) - } - - val (master, inputFile, m, n, k, output_u, output_s, output_v) = - (args(0), args(1), args(2).toInt, args(3).toInt, - args(4).toInt, args(5), args(6), args(7)) - - val sc = new SparkContext(master, "SVD") - - val rawData = sc.textFile(inputFile) - val data = rawData.map { - line => - val parts = line.split(',') - MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) - } - - val decomposed = new SVD().setK(k).compute(SparseMatrix(data, m, n)) - val u = decomposed.U.data - val s = decomposed.S.data - val v = decomposed.V.data - - println("Computed " + s.collect().length + " singular values and vectors") - u.saveAsTextFile(output_u) - s.saveAsTextFile(output_s) - v.saveAsTextFile(output_v) - System.exit(0) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala new file mode 100644 index 0000000000..46b1054574 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +/** Represents singular value decomposition (SVD) factors. */ +case class SingularValueDecomposition[UType, VType](U: UType, s: Vector, V: VType) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala deleted file mode 100644 index cbd1a2a5a4..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -import org.apache.spark.rdd.RDD - - -/** - * Class that represents a sparse matrix - * - * @param data RDD of nonzero entries - * @param m number of rows - * @param n numner of columns - */ -case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala deleted file mode 100644 index e4ef3c58e8..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -import org.apache.spark.rdd.RDD - - -/** - * Class that represents a dense matrix - * - * @param rows RDD of rows - * @param m number of rows - * @param n number of columns - */ -case class TallSkinnyDenseMatrix(val rows: RDD[MatrixRow], val m: Int, val n: Int) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala deleted file mode 100644 index b3a450e923..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -/** - * Class that represents the singular value decomposition of a matrix - * - * @param U such that A = USV^T is a TallSkinnyDenseMatrix - * @param S such that A = USV^T is a simple double array - * @param V such that A = USV^T, V is a 2d array matrix that holds - * singular vectors in columns. Columns are inner arrays - * i.e. V(i)(j) is standard math notation V_{ij} - */ -case class TallSkinnyMatrixSVD(val U: TallSkinnyDenseMatrix, - val S: Array[Double], - val V: Array[Array[Double]]) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala new file mode 100644 index 0000000000..9194f65749 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vectors + +/** + * Represents an entry in an distributed matrix. + * @param i row index + * @param j column index + * @param value value of the entry + */ +case class MatrixEntry(i: Long, j: Long, value: Double) + +/** + * Represents a matrix in coordinate format. + * + * @param entries matrix entries + * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will + * be determined by the max row index plus one. + * @param nCols number of columns. A non-positive value means unknown, and then the number of + * columns will be determined by the max column index plus one. + */ +class CoordinateMatrix( + val entries: RDD[MatrixEntry], + private var nRows: Long, + private var nCols: Long) extends DistributedMatrix { + + /** Alternative constructor leaving matrix dimensions to be determined automatically. */ + def this(entries: RDD[MatrixEntry]) = this(entries, 0L, 0L) + + /** Gets or computes the number of columns. */ + override def numCols(): Long = { + if (nCols <= 0L) { + computeSize() + } + nCols + } + + /** Gets or computes the number of rows. */ + override def numRows(): Long = { + if (nRows <= 0L) { + computeSize() + } + nRows + } + + /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */ + def toIndexedRowMatrix(): IndexedRowMatrix = { + val nl = numCols() + if (nl > Int.MaxValue) { + sys.error(s"Cannot convert to a row-oriented format because the number of columns $nl is " + + "too large.") + } + val n = nl.toInt + val indexedRows = entries.map(entry => (entry.i, (entry.j.toInt, entry.value))) + .groupByKey() + .map { case (i, vectorEntries) => + IndexedRow(i, Vectors.sparse(n, vectorEntries.toSeq)) + } + new IndexedRowMatrix(indexedRows, numRows(), n) + } + + /** + * Converts to RowMatrix, dropping row indices after grouping by row index. + * The number of columns must be within the integer range. + */ + def toRowMatrix(): RowMatrix = { + toIndexedRowMatrix().toRowMatrix() + } + + /** Determines the size by computing the max row/column index. */ + private def computeSize() { + // Reduce will throw an exception if `entries` is empty. + val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) => + (math.max(i1, i2), math.max(j1, j2)) + } + // There may be empty columns at the very right and empty rows at the very bottom. + nRows = math.max(nRows, m1 + 1L) + nCols = math.max(nCols, n1 + 1L) + } + + /** Collects data and assembles a local matrix. */ + private[mllib] override def toBreeze(): BDM[Double] = { + val m = numRows().toInt + val n = numCols().toInt + val mat = BDM.zeros[Double](m, n) + entries.collect().foreach { case MatrixEntry(i, j, value) => + mat(i.toInt, j.toInt) = value + } + mat + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala new file mode 100644 index 0000000000..13f72a3c72 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.mllib.linalg.Matrix + +/** + * Represents a distributively stored matrix backed by one or more RDDs. + */ +trait DistributedMatrix extends Serializable { + + /** Gets or computes the number of rows. */ + def numRows(): Long + + /** Gets or computes the number of columns. */ + def numCols(): Long + + /** Collects data and assembles a local dense breeze matrix (for test only). */ + private[mllib] def toBreeze(): BDM[Double] +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala new file mode 100644 index 0000000000..e110f070bd --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.SingularValueDecomposition + +/** Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]]. */ +case class IndexedRow(index: Long, vector: Vector) + +/** + * Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with + * indexed rows. + * + * @param rows indexed rows of this matrix + * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will + * be determined by the max row index plus one. + * @param nCols number of columns. A non-positive value means unknown, and then the number of + * columns will be determined by the size of the first row. + */ +class IndexedRowMatrix( + val rows: RDD[IndexedRow], + private var nRows: Long, + private var nCols: Int) extends DistributedMatrix { + + /** Alternative constructor leaving matrix dimensions to be determined automatically. */ + def this(rows: RDD[IndexedRow]) = this(rows, 0L, 0) + + override def numCols(): Long = { + if (nCols <= 0) { + // Calling `first` will throw an exception if `rows` is empty. + nCols = rows.first().vector.size + } + nCols + } + + override def numRows(): Long = { + if (nRows <= 0L) { + // Reduce will throw an exception if `rows` is empty. + nRows = rows.map(_.index).reduce(math.max) + 1L + } + nRows + } + + /** + * Drops row indices and converts this matrix to a + * [[org.apache.spark.mllib.linalg.distributed.RowMatrix]]. + */ + def toRowMatrix(): RowMatrix = { + new RowMatrix(rows.map(_.vector), 0L, nCols) + } + + /** + * Computes the singular value decomposition of this matrix. + * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'. + * + * There is no restriction on m, but we require `n^2` doubles to fit in memory. + * Further, n should be less than m. + + * The decomposition is computed by first computing A'A = V S^2 V', + * computing svd locally on that (since n x n is small), from which we recover S and V. + * Then we compute U via easy matrix multiplication as U = A * (V * S^-1). + * Note that this approach requires `O(n^3)` time on the master node. + * + * At most k largest non-zero singular values and associated vectors are returned. + * If there are k such values, then the dimensions of the return will be: + * + * U is an [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]] of size m x k that + * satisfies U'U = eye(k), + * s is a Vector of size k, holding the singular values in descending order, + * and V is a local Matrix of size n x k that satisfies V'V = eye(k). + * + * @param k number of singular values to keep. We might return less than k if there are + * numerically zero singular values. See rCond. + * @param computeU whether to compute U + * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) + * are treated as zero, where sigma(0) is the largest singular value. + * @return SingularValueDecomposition(U, s, V) + */ + def computeSVD( + k: Int, + computeU: Boolean = false, + rCond: Double = 1e-9): SingularValueDecomposition[IndexedRowMatrix, Matrix] = { + val indices = rows.map(_.index) + val svd = toRowMatrix().computeSVD(k, computeU, rCond) + val U = if (computeU) { + val indexedRows = indices.zip(svd.U.rows).map { case (i, v) => + IndexedRow(i, v) + } + new IndexedRowMatrix(indexedRows, nRows, nCols) + } else { + null + } + SingularValueDecomposition(U, svd.s, svd.V) + } + + /** + * Multiply this matrix by a local matrix on the right. + * + * @param B a local matrix whose number of rows must match the number of columns of this matrix + * @return an IndexedRowMatrix representing the product, which preserves partitioning + */ + def multiply(B: Matrix): IndexedRowMatrix = { + val mat = toRowMatrix().multiply(B) + val indexedRows = rows.map(_.index).zip(mat.rows).map { case (i, v) => + IndexedRow(i, v) + } + new IndexedRowMatrix(indexedRows, nRows, nCols) + } + + /** + * Computes the Gramian matrix `A^T A`. + */ + def computeGramianMatrix(): Matrix = { + toRowMatrix().computeGramianMatrix() + } + + private[mllib] override def toBreeze(): BDM[Double] = { + val m = numRows().toInt + val n = numCols().toInt + val mat = BDM.zeros[Double](m, n) + rows.collect().foreach { case IndexedRow(rowIndex, vector) => + val i = rowIndex.toInt + vector.toBreeze.activeIterator.foreach { case (j, v) => + mat(i, j) = v + } + } + mat + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala new file mode 100644 index 0000000000..f59811f18a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import java.util + +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd} +import breeze.numerics.{sqrt => brzSqrt} +import com.github.fommil.netlib.BLAS.{getInstance => blas} + +import org.apache.spark.mllib.linalg._ +import org.apache.spark.rdd.RDD +import org.apache.spark.Logging + +/** + * Represents a row-oriented distributed Matrix with no meaningful row indices. + * + * @param rows rows stored as an RDD[Vector] + * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will + * be determined by the number of records in the RDD `rows`. + * @param nCols number of columns. A non-positive value means unknown, and then the number of + * columns will be determined by the size of the first row. + */ +class RowMatrix( + val rows: RDD[Vector], + private var nRows: Long, + private var nCols: Int) extends DistributedMatrix with Logging { + + /** Alternative constructor leaving matrix dimensions to be determined automatically. */ + def this(rows: RDD[Vector]) = this(rows, 0L, 0) + + /** Gets or computes the number of columns. */ + override def numCols(): Long = { + if (nCols <= 0) { + // Calling `first` will throw an exception if `rows` is empty. + nCols = rows.first().size + } + nCols + } + + /** Gets or computes the number of rows. */ + override def numRows(): Long = { + if (nRows <= 0L) { + nRows = rows.count() + if (nRows == 0L) { + sys.error("Cannot determine the number of rows because it is not specified in the " + + "constructor and the rows RDD is empty.") + } + } + nRows + } + + /** + * Computes the Gramian matrix `A^T A`. + */ + def computeGramianMatrix(): Matrix = { + val n = numCols().toInt + val nt: Int = n * (n + 1) / 2 + + // Compute the upper triangular part of the gram matrix. + val GU = rows.aggregate(new BDV[Double](new Array[Double](nt)))( + seqOp = (U, v) => { + RowMatrix.dspr(1.0, v, U.data) + U + }, + combOp = (U1, U2) => U1 += U2 + ) + + RowMatrix.triuToFull(n, GU.data) + } + + /** + * Computes the singular value decomposition of this matrix. + * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'. + * + * There is no restriction on m, but we require `n^2` doubles to fit in memory. + * Further, n should be less than m. + + * The decomposition is computed by first computing A'A = V S^2 V', + * computing svd locally on that (since n x n is small), from which we recover S and V. + * Then we compute U via easy matrix multiplication as U = A * (V * S^-1). + * Note that this approach requires `O(n^3)` time on the master node. + * + * At most k largest non-zero singular values and associated vectors are returned. + * If there are k such values, then the dimensions of the return will be: + * + * U is a RowMatrix of size m x k that satisfies U'U = eye(k), + * s is a Vector of size k, holding the singular values in descending order, + * and V is a Matrix of size n x k that satisfies V'V = eye(k). + * + * @param k number of singular values to keep. We might return less than k if there are + * numerically zero singular values. See rCond. + * @param computeU whether to compute U + * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) + * are treated as zero, where sigma(0) is the largest singular value. + * @return SingularValueDecomposition(U, s, V) + */ + def computeSVD( + k: Int, + computeU: Boolean = false, + rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { + val n = numCols().toInt + require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") + + val G = computeGramianMatrix() + + // TODO: Use sparse SVD instead. + val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) = + brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) + val sigmas: BDV[Double] = brzSqrt(sigmaSquares) + + // Determine effective rank. + val sigma0 = sigmas(0) + val threshold = rCond * sigma0 + var i = 0 + while (i < k && sigmas(i) >= threshold) { + i += 1 + } + val sk = i + + if (sk < k) { + logWarning(s"Requested $k singular values but only found $sk nonzeros.") + } + + val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk)) + val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk)) + + if (computeU) { + // N = Vk * Sk^{-1} + val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk)) + var i = 0 + var j = 0 + while (j < sk) { + i = 0 + val sigma = sigmas(j) + while (i < n) { + N(i, j) /= sigma + i += 1 + } + j += 1 + } + val U = this.multiply(Matrices.fromBreeze(N)) + SingularValueDecomposition(U, s, V) + } else { + SingularValueDecomposition(null, s, V) + } + } + + /** + * Computes the covariance matrix, treating each row as an observation. + * @return a local dense matrix of size n x n + */ + def computeCovariance(): Matrix = { + val n = numCols().toInt + + if (n > 10000) { + val mem = n * n * java.lang.Double.SIZE / java.lang.Byte.SIZE + logWarning(s"The number of columns $n is greater than 10000! " + + s"We need at least $mem bytes of memory.") + } + + val (m, mean) = rows.aggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))( + seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze), + combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + s2._1, s1._2 += s2._2) + ) + + // Update _m if it is not set, or verify its value. + if (nRows <= 0L) { + nRows = m + } else { + require(nRows == m, + s"The number of rows $m is different from what specified or previously computed: ${nRows}.") + } + + mean :/= m.toDouble + + // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is + // large but Cov(X, Y) is small, but it is good for sparse computation. + // TODO: find a fast and stable way for sparse data. + + val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] + + var i = 0 + var j = 0 + val m1 = m - 1.0 + var alpha = 0.0 + while (i < n) { + alpha = m / m1 * mean(i) + j = 0 + while (j < n) { + G(i, j) = G(i, j) / m1 - alpha * mean(j) + j += 1 + } + i += 1 + } + + Matrices.fromBreeze(G) + } + + /** + * Computes the top k principal components. + * Rows correspond to observations and columns correspond to variables. + * The principal components are stored a local matrix of size n-by-k. + * Each column corresponds for one principal component, + * and the columns are in descending order of component variance. + * + * @param k number of top principal components. + * @return a matrix of size n-by-k, whose columns are principal components + */ + def computePrincipalComponents(k: Int): Matrix = { + val n = numCols().toInt + require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]") + + val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]] + + val (u: BDM[Double], _, _) = brzSvd(Cov) + + if (k == n) { + Matrices.dense(n, k, u.data) + } else { + Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k)) + } + } + + /** + * Multiply this matrix by a local matrix on the right. + * + * @param B a local matrix whose number of rows must match the number of columns of this matrix + * @return a [[org.apache.spark.mllib.linalg.distributed.RowMatrix]] representing the product, + * which preserves partitioning + */ + def multiply(B: Matrix): RowMatrix = { + val n = numCols().toInt + require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}") + + require(B.isInstanceOf[DenseMatrix], + s"Only support dense matrix at this time but found ${B.getClass.getName}.") + + val Bb = rows.context.broadcast(B) + val AB = rows.mapPartitions({ iter => + val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]] + iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze)) + }, preservesPartitioning = true) + + new RowMatrix(AB, nRows, B.numCols) + } + + private[mllib] override def toBreeze(): BDM[Double] = { + val m = numRows().toInt + val n = numCols().toInt + val mat = BDM.zeros[Double](m, n) + var i = 0 + rows.collect().foreach { v => + v.toBreeze.activeIterator.foreach { case (j, v) => + mat(i, j) = v + } + i += 1 + } + mat + } +} + +object RowMatrix { + + /** + * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR. + * + * @param U the upper triangular part of the matrix packed in an array (column major) + */ + private def dspr(alpha: Double, v: Vector, U: Array[Double]): Unit = { + // TODO: Find a better home (breeze?) for this method. + val n = v.size + v match { + case dv: DenseVector => + blas.dspr("U", n, 1.0, dv.values, 1, U) + case sv: SparseVector => + val indices = sv.indices + val values = sv.values + val nnz = indices.length + var colStartIdx = 0 + var prevCol = 0 + var col = 0 + var j = 0 + var i = 0 + var av = 0.0 + while (j < nnz) { + col = indices(j) + // Skip empty columns. + colStartIdx += (col - prevCol) * (col + prevCol + 1) / 2 + col = indices(j) + av = alpha * values(j) + i = 0 + while (i <= j) { + U(colStartIdx + indices(i)) += av * values(i) + i += 1 + } + j += 1 + prevCol = col + } + } + } + + /** + * Fills a full square matrix from its upper triangular part. + */ + private def triuToFull(n: Int, U: Array[Double]): Matrix = { + val G = new BDM[Double](n, n) + + var row = 0 + var col = 0 + var idx = 0 + var value = 0.0 + while (col < n) { + row = 0 + while (row < col) { + value = U(idx) + G(row, col) = value + G(col, row) = value + idx += 1 + row += 1 + } + G(col, col) = U(idx) + idx += 1 + col +=1 + } + + Matrices.dense(n, n, G.data) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala deleted file mode 100644 index 87aac34757..0000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.util - -import org.apache.spark.SparkContext._ - -import org.apache.spark.mllib.linalg._ - -/** - * Helper methods for linear algebra - */ -object LAUtils { - /** - * Convert a SparseMatrix into a TallSkinnyDenseMatrix - * - * @param sp Sparse matrix to be converted - * @return dense version of the input - */ - def sparseToTallSkinnyDense(sp: SparseMatrix): TallSkinnyDenseMatrix = { - val m = sp.m - val n = sp.n - val rows = sp.data.map(x => (x.i, (x.j, x.mval))).groupByKey().map { - case (i, cols) => - val rowArray = Array.ofDim[Double](n) - var j = 0 - val colsItr = cols.iterator - while (colsItr.hasNext) { - val element = colsItr.next - rowArray(element._1) = element._2 - j += 1 - } - MatrixRow(i, rowArray) - } - TallSkinnyDenseMatrix(rows, m, n) - } - - /** - * Convert a TallSkinnyDenseMatrix to a SparseMatrix - * - * @param a matrix to be converted - * @return sparse version of the input - */ - def denseToSparse(a: TallSkinnyDenseMatrix): SparseMatrix = { - val m = a.m - val n = a.n - val data = a.rows.flatMap { - mrow => Array.tabulate(n)(j => MatrixEntry(mrow.i, j, mrow.data(j))) - .filter(x => x.mval != 0) - } - SparseMatrix(data, m, n) - } -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala new file mode 100644 index 0000000000..82d49c76ed --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseMatrix => BDM} + +class BreezeMatrixConversionSuite extends FunSuite { + test("dense matrix to breeze") { + val mat = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) + val breeze = mat.toBreeze.asInstanceOf[BDM[Double]] + assert(breeze.rows === mat.numRows) + assert(breeze.cols === mat.numCols) + assert(breeze.data.eq(mat.asInstanceOf[DenseMatrix].values), "should not copy data") + } + + test("dense breeze matrix to matrix") { + val breeze = new BDM[Double](3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) + val mat = Matrices.fromBreeze(breeze).asInstanceOf[DenseMatrix] + assert(mat.numRows === breeze.rows) + assert(mat.numCols === breeze.cols) + assert(mat.values.eq(breeze.data), "should not copy data") + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala new file mode 100644 index 0000000000..9c66b4db9f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.scalatest.FunSuite + +class MatricesSuite extends FunSuite { + test("dense matrix construction") { + val m = 3 + val n = 2 + val values = Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0) + val mat = Matrices.dense(m, n, values).asInstanceOf[DenseMatrix] + assert(mat.numRows === m) + assert(mat.numCols === n) + assert(mat.values.eq(values), "should not copy data") + assert(mat.toArray.eq(values), "toArray should not copy data") + } + + test("dense matrix construction with wrong dimension") { + intercept[RuntimeException] { + Matrices.dense(3, 2, Array(0.0, 1.0, 2.0)) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala deleted file mode 100644 index 5e5086b1bf..0000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -import scala.util.Random - -import org.scalatest.BeforeAndAfterAll -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import org.apache.spark.mllib.util._ - -import org.jblas._ - -class PCASuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } - - val EPSILON = 1e-3 - - // Return jblas matrix from sparse matrix RDD - def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = { - val data = matrix.data - val ret = DoubleMatrix.zeros(matrix.m, matrix.n) - matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval)) - ret - } - - def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) { - assert(a.rows == b.rows && a.columns == b.columns, - "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns") - for (i <- 0 until a.columns) { - val aCol = a.getColumn(i) - val bCol = b.getColumn(i) - val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1) - assert(diff < EPSILON, "matrix mismatch: " + diff) - } - } - - test("full rank matrix pca") { - val m = 5 - val n = 3 - val dataArr = Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten - val data = sc.makeRDD(dataArr, 3) - val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) - - val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054), - (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474), - (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886)) - val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)), 3) - - val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a)) - - assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs) - } - - test("sparse matrix full rank matrix pca") { - val m = 5 - val n = 3 - // the entry that gets dropped is zero to test sparse support - val dataArr = Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten.drop(1) - val data = sc.makeRDD(dataArr, 3) - val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) - - val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054), - (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474), - (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886)) - val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3))) - - val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a)) - - assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs) - } - - test("truncated matrix pca") { - val m = 5 - val n = 3 - val dataArr = Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten - - val data = sc.makeRDD(dataArr, 3) - val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) - - val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), - (1,0,-0.1448), (1,1,0.7483), - (2,0,0.9553), (2,1,-0.0649)) - val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3))) - - val k = 2 - val coeffs = new DoubleMatrix(new PCA().setK(k).compute(a)) - - assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,k)), coeffs) - } -} - - diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala deleted file mode 100644 index 20e2b0f84b..0000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -import scala.util.Random - -import org.scalatest.BeforeAndAfterAll -import org.scalatest.FunSuite - -import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import org.apache.spark.mllib.util._ - -import org.jblas._ - -class SVDSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } - - val EPSILON = 1e-4 - - // Return jblas matrix from sparse matrix RDD - def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = { - val data = matrix.data - val m = matrix.m - val n = matrix.n - val ret = DoubleMatrix.zeros(m, n) - matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval)) - ret - } - - def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) { - assert(a.rows == b.rows && a.columns == b.columns, - "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns") - for (i <- 0 until a.columns) { - val aCol = a.getColumn(i) - val bCol = b.getColumn(i) - val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1) - assert(diff < EPSILON, "matrix mismatch: " + diff) - } - } - - test("full rank matrix svd") { - val m = 10 - val n = 3 - val datarr = Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten - val data = sc.makeRDD(datarr, 3) - - val a = SparseMatrix(data, m, n) - - val decomposed = new SVD().setK(n).compute(a) - val u = decomposed.U - val v = decomposed.V - val s = decomposed.S - - val denseA = getDenseMatrix(a) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = getDenseMatrix(s) - val retv = getDenseMatrix(v) - - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) - assertMatrixApproximatelyEquals(retv, svd(2)) - - // check multiplication guarantee - assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) - } - - test("dense full rank matrix svd") { - val m = 10 - val n = 3 - val datarr = Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten - val data = sc.makeRDD(datarr, 3) - - val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) - - val decomposed = new SVD().setK(n).setComputeU(true).compute(a) - val u = LAUtils.denseToSparse(decomposed.U) - val v = decomposed.V - val s = decomposed.S - - val denseA = getDenseMatrix(LAUtils.denseToSparse(a)) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = DoubleMatrix.diag(new DoubleMatrix(s)) - val retv = new DoubleMatrix(v) - - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) - assertMatrixApproximatelyEquals(retv, svd(2)) - - // check multiplication guarantee - assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) - } - - test("rank one matrix svd") { - val m = 10 - val n = 3 - val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) => - MatrixEntry(a, b, 1.0) }.flatten ) - val k = 1 - - val a = SparseMatrix(data, m, n) - - val decomposed = new SVD().setK(k).compute(a) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V - val retrank = s.data.collect().length - - assert(retrank == 1, "rank returned not one") - - val denseA = getDenseMatrix(a) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = getDenseMatrix(s) - val retv = getDenseMatrix(v) - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) - assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0)) - - // check multiplication guarantee - assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) - } - - test("truncated with k") { - val m = 10 - val n = 3 - val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten ) - val a = SparseMatrix(data, m, n) - - val k = 1 // only one svalue above this - - val decomposed = new SVD().setK(k).compute(a) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V - val retrank = s.data.collect().length - - val denseA = getDenseMatrix(a) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = getDenseMatrix(s) - val retv = getDenseMatrix(v) - - assert(retrank == 1, "rank returned not one") - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) - assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0)) - } -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala new file mode 100644 index 0000000000..cd45438fb6 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.linalg.Vectors + +class CoordinateMatrixSuite extends FunSuite with LocalSparkContext { + + val m = 5 + val n = 4 + var mat: CoordinateMatrix = _ + + override def beforeAll() { + super.beforeAll() + val entries = sc.parallelize(Seq( + (0, 0, 1.0), + (0, 1, 2.0), + (1, 1, 3.0), + (1, 2, 4.0), + (2, 2, 5.0), + (2, 3, 6.0), + (3, 0, 7.0), + (3, 3, 8.0), + (4, 1, 9.0)), 3).map { case (i, j, value) => + MatrixEntry(i, j, value) + } + mat = new CoordinateMatrix(entries) + } + + test("size") { + assert(mat.numRows() === m) + assert(mat.numCols() === n) + } + + test("empty entries") { + val entries = sc.parallelize(Seq[MatrixEntry](), 1) + val emptyMat = new CoordinateMatrix(entries) + intercept[RuntimeException] { + emptyMat.numCols() + } + intercept[RuntimeException] { + emptyMat.numRows() + } + } + + test("toBreeze") { + val expected = BDM( + (1.0, 2.0, 0.0, 0.0), + (0.0, 3.0, 4.0, 0.0), + (0.0, 0.0, 5.0, 6.0), + (7.0, 0.0, 0.0, 8.0), + (0.0, 9.0, 0.0, 0.0)) + assert(mat.toBreeze() === expected) + } + + test("toIndexedRowMatrix") { + val indexedRowMatrix = mat.toIndexedRowMatrix() + val expected = BDM( + (1.0, 2.0, 0.0, 0.0), + (0.0, 3.0, 4.0, 0.0), + (0.0, 0.0, 5.0, 6.0), + (7.0, 0.0, 0.0, 8.0), + (0.0, 9.0, 0.0, 0.0)) + assert(indexedRowMatrix.toBreeze() === expected) + } + + test("toRowMatrix") { + val rowMatrix = mat.toRowMatrix() + val rows = rowMatrix.rows.collect().toSet + val expected = Set( + Vectors.dense(1.0, 2.0, 0.0, 0.0), + Vectors.dense(0.0, 3.0, 4.0, 0.0), + Vectors.dense(0.0, 0.0, 5.0, 6.0), + Vectors.dense(7.0, 0.0, 0.0, 8.0), + Vectors.dense(0.0, 9.0, 0.0, 0.0)) + assert(rows === expected) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala new file mode 100644 index 0000000000..f7c46f23b7 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import org.scalatest.FunSuite + +import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV} + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vectors} + +class IndexedRowMatrixSuite extends FunSuite with LocalSparkContext { + + val m = 4 + val n = 3 + val data = Seq( + (0L, Vectors.dense(0.0, 1.0, 2.0)), + (1L, Vectors.dense(3.0, 4.0, 5.0)), + (3L, Vectors.dense(9.0, 0.0, 1.0)) + ).map(x => IndexedRow(x._1, x._2)) + var indexedRows: RDD[IndexedRow] = _ + + override def beforeAll() { + super.beforeAll() + indexedRows = sc.parallelize(data, 2) + } + + test("size") { + val mat1 = new IndexedRowMatrix(indexedRows) + assert(mat1.numRows() === m) + assert(mat1.numCols() === n) + + val mat2 = new IndexedRowMatrix(indexedRows, 5, 0) + assert(mat2.numRows() === 5) + assert(mat2.numCols() === n) + } + + test("empty rows") { + val rows = sc.parallelize(Seq[IndexedRow](), 1) + val mat = new IndexedRowMatrix(rows) + intercept[RuntimeException] { + mat.numRows() + } + intercept[RuntimeException] { + mat.numCols() + } + } + + test("toBreeze") { + val mat = new IndexedRowMatrix(indexedRows) + val expected = BDM( + (0.0, 1.0, 2.0), + (3.0, 4.0, 5.0), + (0.0, 0.0, 0.0), + (9.0, 0.0, 1.0)) + assert(mat.toBreeze() === expected) + } + + test("toRowMatrix") { + val idxRowMat = new IndexedRowMatrix(indexedRows) + val rowMat = idxRowMat.toRowMatrix() + assert(rowMat.numCols() === n) + assert(rowMat.numRows() === 3, "should drop empty rows") + assert(rowMat.rows.collect().toSeq === data.map(_.vector).toSeq) + } + + test("multiply a local matrix") { + val A = new IndexedRowMatrix(indexedRows) + val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) + val C = A.multiply(B) + val localA = A.toBreeze() + val localC = C.toBreeze() + val expected = localA * B.toBreeze.asInstanceOf[BDM[Double]] + assert(localC === expected) + } + + test("gram") { + val A = new IndexedRowMatrix(indexedRows) + val G = A.computeGramianMatrix() + val expected = BDM( + (90.0, 12.0, 24.0), + (12.0, 17.0, 22.0), + (24.0, 22.0, 30.0)) + assert(G.toBreeze === expected) + } + + test("svd") { + val A = new IndexedRowMatrix(indexedRows) + val svd = A.computeSVD(n, computeU = true) + assert(svd.U.isInstanceOf[IndexedRowMatrix]) + val localA = A.toBreeze() + val U = svd.U.toBreeze() + val s = svd.s.toBreeze.asInstanceOf[BDV[Double]] + val V = svd.V.toBreeze.asInstanceOf[BDM[Double]] + assert(closeToZero(U.t * U - BDM.eye[Double](n))) + assert(closeToZero(V.t * V - BDM.eye[Double](n))) + assert(closeToZero(U * brzDiag(s) * V.t - localA)) + } + + def closeToZero(G: BDM[Double]): Boolean = { + G.valuesIterator.map(math.abs).sum < 1e-6 + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala new file mode 100644 index 0000000000..71ee8e8a4f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, norm => brzNorm, svd => brzSvd} + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.linalg.{Matrices, Vectors, Vector} + +class RowMatrixSuite extends FunSuite with LocalSparkContext { + + val m = 4 + val n = 3 + val arr = Array(0.0, 3.0, 6.0, 9.0, 1.0, 4.0, 7.0, 0.0, 2.0, 5.0, 8.0, 1.0) + val denseData = Seq( + Vectors.dense(0.0, 1.0, 2.0), + Vectors.dense(3.0, 4.0, 5.0), + Vectors.dense(6.0, 7.0, 8.0), + Vectors.dense(9.0, 0.0, 1.0) + ) + val sparseData = Seq( + Vectors.sparse(3, Seq((1, 1.0), (2, 2.0))), + Vectors.sparse(3, Seq((0, 3.0), (1, 4.0), (2, 5.0))), + Vectors.sparse(3, Seq((0, 6.0), (1, 7.0), (2, 8.0))), + Vectors.sparse(3, Seq((0, 9.0), (2, 1.0))) + ) + + val principalComponents = BDM( + (0.0, 1.0, 0.0), + (math.sqrt(2.0) / 2.0, 0.0, math.sqrt(2.0) / 2.0), + (math.sqrt(2.0) / 2.0, 0.0, - math.sqrt(2.0) / 2.0)) + + var denseMat: RowMatrix = _ + var sparseMat: RowMatrix = _ + + override def beforeAll() { + super.beforeAll() + denseMat = new RowMatrix(sc.parallelize(denseData, 2)) + sparseMat = new RowMatrix(sc.parallelize(sparseData, 2)) + } + + test("size") { + assert(denseMat.numRows() === m) + assert(denseMat.numCols() === n) + assert(sparseMat.numRows() === m) + assert(sparseMat.numCols() === n) + } + + test("empty rows") { + val rows = sc.parallelize(Seq[Vector](), 1) + val emptyMat = new RowMatrix(rows) + intercept[RuntimeException] { + emptyMat.numCols() + } + intercept[RuntimeException] { + emptyMat.numRows() + } + } + + test("toBreeze") { + val expected = BDM( + (0.0, 1.0, 2.0), + (3.0, 4.0, 5.0), + (6.0, 7.0, 8.0), + (9.0, 0.0, 1.0)) + for (mat <- Seq(denseMat, sparseMat)) { + assert(mat.toBreeze() === expected) + } + } + + test("gram") { + val expected = + Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0)) + for (mat <- Seq(denseMat, sparseMat)) { + val G = mat.computeGramianMatrix() + assert(G.toBreeze === expected.toBreeze) + } + } + + test("svd of a full-rank matrix") { + for (mat <- Seq(denseMat, sparseMat)) { + val localMat = mat.toBreeze() + val (localU, localSigma, localVt) = brzSvd(localMat) + val localV: BDM[Double] = localVt.t.toDenseMatrix + for (k <- 1 to n) { + val svd = mat.computeSVD(k, computeU = true) + val U = svd.U + val s = svd.s + val V = svd.V + assert(U.numRows() === m) + assert(U.numCols() === k) + assert(s.size === k) + assert(V.numRows === n) + assert(V.numCols === k) + assertColumnEqualUpToSign(U.toBreeze(), localU, k) + assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) + assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) + } + val svdWithoutU = mat.computeSVD(n) + assert(svdWithoutU.U === null) + } + } + + test("svd of a low-rank matrix") { + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 2) + val svd = mat.computeSVD(2, computeU = true) + assert(svd.s.size === 1, "should not return zero singular values") + assert(svd.U.numRows() === 4) + assert(svd.U.numCols() === 1) + assert(svd.V.numRows === 2) + assert(svd.V.numCols === 1) + } + + def closeToZero(G: BDM[Double]): Boolean = { + G.valuesIterator.map(math.abs).sum < 1e-6 + } + + def closeToZero(v: BDV[Double]): Boolean = { + brzNorm(v, 1.0) < 1e-6 + } + + def assertColumnEqualUpToSign(A: BDM[Double], B: BDM[Double], k: Int) { + assert(A.rows === B.rows) + for (j <- 0 until k) { + val aj = A(::, j) + val bj = B(::, j) + assert(closeToZero(aj - bj) || closeToZero(aj + bj), + s"The $j-th columns mismatch: $aj and $bj") + } + } + + test("pca") { + for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) { + val pc = denseMat.computePrincipalComponents(k) + assert(pc.numRows === n) + assert(pc.numCols === k) + assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k) + } + } + + test("multiply a local matrix") { + val B = Matrices.dense(n, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) + for (mat <- Seq(denseMat, sparseMat)) { + val AB = mat.multiply(B) + assert(AB.numRows() === m) + assert(AB.numCols() === 2) + assert(AB.rows.collect().toSeq === Seq( + Vectors.dense(5.0, 14.0), + Vectors.dense(14.0, 50.0), + Vectors.dense(23.0, 86.0), + Vectors.dense(2.0, 32.0) + )) + } + } +} -- cgit v1.2.3