aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-04-08 23:01:15 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-08 23:01:15 -0700
commit9689b663a2a4947ad60795321c770052f3c637f1 (patch)
treef2647f7b1ae3a3d11d3ecb29e764214b7cb589ca
parentfa0524fd02eedd0bbf1edc750dc3997a86ea25f5 (diff)
downloadspark-9689b663a2a4947ad60795321c770052f3c637f1.tar.gz
spark-9689b663a2a4947ad60795321c770052f3c637f1.tar.bz2
spark-9689b663a2a4947ad60795321c770052f3c637f1.zip
[SPARK-1390] Refactoring of matrices backed by RDDs
This is to refactor interfaces for matrices backed by RDDs. It would be better if we have a clear separation of local matrices and those backed by RDDs. Right now, we have 1. `org.apache.spark.mllib.linalg.SparseMatrix`, which is a wrapper over an RDD of matrix entries, i.e., coordinate list format. 2. `org.apache.spark.mllib.linalg.TallSkinnyDenseMatrix`, which is a wrapper over RDD[Array[Double]], i.e. row-oriented format. We will see naming collision when we introduce local `SparseMatrix`, and the name `TallSkinnyDenseMatrix` is not exact if we switch to `RDD[Vector]` from `RDD[Array[Double]]`. It would be better to have "RDD" in the class name to suggest that operations may trigger jobs. The proposed names are (all under `org.apache.spark.mllib.linalg.rdd`): 1. `RDDMatrix`: trait for matrices backed by one or more RDDs 2. `CoordinateRDDMatrix`: wrapper of `RDD[(Long, Long, Double)]` 3. `RowRDDMatrix`: wrapper of `RDD[Vector]` whose rows do not have special ordering 4. `IndexedRowRDDMatrix`: wrapper of `RDD[(Long, Vector)]` whose rows are associated with indices The current code also introduces local matrices. Author: Xiangrui Meng <meng@databricks.com> Closes #296 from mengxr/mat and squashes the following commits: 24d8294 [Xiangrui Meng] fix for groupBy returning Iterable bfc2b26 [Xiangrui Meng] merge master 8e4f1f5 [Xiangrui Meng] Merge branch 'master' into mat 0135193 [Xiangrui Meng] address Reza's comments 03cd7e1 [Xiangrui Meng] add pca/gram to IndexedRowMatrix add toBreeze to DistributedMatrix for test simplify tests b177ff1 [Xiangrui Meng] address Matei's comments be119fe [Xiangrui Meng] rename m/n to numRows/numCols for local matrix add tests for matrices b881506 [Xiangrui Meng] rename SparkPCA/SVD to TallSkinnyPCA/SVD e7d0d4a [Xiangrui Meng] move IndexedRDDMatrixRow to IndexedRowRDDMatrix 0d1491c [Xiangrui Meng] fix test errors a85262a [Xiangrui Meng] rename RDDMatrixRow to IndexedRDDMatrixRow b8b6ac3 [Xiangrui Meng] Remove old code 4cf679c [Xiangrui Meng] port pca to RowRDDMatrix, and add multiply and covariance 7836e2f [Xiangrui Meng] initial refactoring of matrices backed by RDDs
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala51
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala59
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala64
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala64
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala101
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala29
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala120
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala395
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala (renamed from mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala)9
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala31
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala112
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala (renamed from mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala)23
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala148
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala344
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala67
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala (renamed from mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala)29
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala (renamed from mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala)27
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala124
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala194
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala98
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala120
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala173
22 files changed, 1280 insertions, 1102 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala
deleted file mode 100644
index d4e08c5e12..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.mllib
-
-import org.apache.spark.SparkContext
-import org.apache.spark.mllib.linalg.PCA
-import org.apache.spark.mllib.linalg.MatrixEntry
-import org.apache.spark.mllib.linalg.SparseMatrix
-import org.apache.spark.mllib.util._
-
-
-/**
- * Compute PCA of an example matrix.
- */
-object SparkPCA {
- def main(args: Array[String]) {
- if (args.length != 3) {
- System.err.println("Usage: SparkPCA <master> m n")
- System.exit(1)
- }
- val sc = new SparkContext(args(0), "PCA",
- System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-
- val m = args(2).toInt
- val n = args(3).toInt
-
- // Make example matrix
- val data = Array.tabulate(m, n) { (a, b) =>
- (a + 2).toDouble * (b + 1) / (1 + a + b) }
-
- // recover top principal component
- val coeffs = new PCA().setK(1).compute(sc.makeRDD(data))
-
- println("top principal component = " + coeffs.mkString(", "))
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
deleted file mode 100644
index 2933cec497..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.mllib
-
-import org.apache.spark.SparkContext
-import org.apache.spark.mllib.linalg.SVD
-import org.apache.spark.mllib.linalg.MatrixEntry
-import org.apache.spark.mllib.linalg.SparseMatrix
-
-/**
- * Compute SVD of an example matrix
- * Input file should be comma separated, 1 indexed of the form
- * i,j,value
- * Where i is the column, j the row, and value is the matrix entry
- *
- * For example input file, see:
- * mllib/data/als/test.data (example is 4 x 4)
- */
-object SparkSVD {
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: SparkSVD <master> <file> m n")
- System.exit(1)
- }
- val sc = new SparkContext(args(0), "SVD",
- System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-
- // Load and parse the data file
- val data = sc.textFile(args(1)).map { line =>
- val parts = line.split(',')
- MatrixEntry(parts(0).toInt - 1, parts(1).toInt - 1, parts(2).toDouble)
- }
- val m = args(2).toInt
- val n = args(3).toInt
-
- // recover largest singular vector
- val decomposed = new SVD().setK(1).compute(SparseMatrix(data, m, n))
- val u = decomposed.U.data
- val s = decomposed.S.data
- val v = decomposed.V.data
-
- println("singular values = " + s.collect().mkString)
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala
new file mode 100644
index 0000000000..a177435e60
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.linalg.Vectors
+
+/**
+ * Compute the principal components of a tall-and-skinny matrix, whose rows are observations.
+ *
+ * The input matrix must be stored in row-oriented dense format, one line per row with its entries
+ * separated by space. For example,
+ * {{{
+ * 0.5 1.0
+ * 2.0 3.0
+ * 4.0 5.0
+ * }}}
+ * represents a 3-by-2 matrix, whose first row is (0.5, 1.0).
+ */
+object TallSkinnyPCA {
+ def main(args: Array[String]) {
+ if (args.length != 2) {
+ System.err.println("Usage: TallSkinnyPCA <master> <file>")
+ System.exit(1)
+ }
+
+ val conf = new SparkConf()
+ .setMaster(args(0))
+ .setAppName("TallSkinnyPCA")
+ .setSparkHome(System.getenv("SPARK_HOME"))
+ .setJars(SparkContext.jarOfClass(this.getClass))
+ val sc = new SparkContext(conf)
+
+ // Load and parse the data file.
+ val rows = sc.textFile(args(1)).map { line =>
+ val values = line.split(' ').map(_.toDouble)
+ Vectors.dense(values)
+ }
+ val mat = new RowMatrix(rows)
+
+ // Compute principal components.
+ val pc = mat.computePrincipalComponents(mat.numCols().toInt)
+
+ println("Principal components are:\n" + pc)
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala
new file mode 100644
index 0000000000..49d09692c8
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.linalg.Vectors
+
+/**
+ * Compute the singular value decomposition (SVD) of a tall-and-skinny matrix.
+ *
+ * The input matrix must be stored in row-oriented dense format, one line per row with its entries
+ * separated by space. For example,
+ * {{{
+ * 0.5 1.0
+ * 2.0 3.0
+ * 4.0 5.0
+ * }}}
+ * represents a 3-by-2 matrix, whose first row is (0.5, 1.0).
+ */
+object TallSkinnySVD {
+ def main(args: Array[String]) {
+ if (args.length != 2) {
+ System.err.println("Usage: TallSkinnySVD <master> <file>")
+ System.exit(1)
+ }
+
+ val conf = new SparkConf()
+ .setMaster(args(0))
+ .setAppName("TallSkinnySVD")
+ .setSparkHome(System.getenv("SPARK_HOME"))
+ .setJars(SparkContext.jarOfClass(this.getClass))
+ val sc = new SparkContext(conf)
+
+ // Load and parse the data file.
+ val rows = sc.textFile(args(1)).map { line =>
+ val values = line.split(' ').map(_.toDouble)
+ Vectors.dense(values)
+ }
+ val mat = new RowMatrix(rows)
+
+ // Compute SVD.
+ val svd = mat.computeSVD(mat.numCols().toInt)
+
+ println("Singular values are " + svd.s)
+
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
new file mode 100644
index 0000000000..b11ba5d30f
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import breeze.linalg.{Matrix => BM, DenseMatrix => BDM}
+
+/**
+ * Trait for a local matrix.
+ */
+trait Matrix extends Serializable {
+
+ /** Number of rows. */
+ def numRows: Int
+
+ /** Number of columns. */
+ def numCols: Int
+
+ /** Converts to a dense array in column major. */
+ def toArray: Array[Double]
+
+ /** Converts to a breeze matrix. */
+ private[mllib] def toBreeze: BM[Double]
+
+ /** Gets the (i, j)-th element. */
+ private[mllib] def apply(i: Int, j: Int): Double = toBreeze(i, j)
+
+ override def toString: String = toBreeze.toString()
+}
+
+/**
+ * Column-majored dense matrix.
+ * The entry values are stored in a single array of doubles with columns listed in sequence.
+ * For example, the following matrix
+ * {{{
+ * 1.0 2.0
+ * 3.0 4.0
+ * 5.0 6.0
+ * }}}
+ * is stored as `[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]`.
+ *
+ * @param numRows number of rows
+ * @param numCols number of columns
+ * @param values matrix entries in column major
+ */
+class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix {
+
+ require(values.length == numRows * numCols)
+
+ override def toArray: Array[Double] = values
+
+ private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values)
+}
+
+/**
+ * Factory methods for [[org.apache.spark.mllib.linalg.Matrix]].
+ */
+object Matrices {
+
+ /**
+ * Creates a column-majored dense matrix.
+ *
+ * @param numRows number of rows
+ * @param numCols number of columns
+ * @param values matrix entries in column major
+ */
+ def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix = {
+ new DenseMatrix(numRows, numCols, values)
+ }
+
+ /**
+ * Creates a Matrix instance from a breeze matrix.
+ * @param breeze a breeze matrix
+ * @return a Matrix instance
+ */
+ private[mllib] def fromBreeze(breeze: BM[Double]): Matrix = {
+ breeze match {
+ case dm: BDM[Double] =>
+ require(dm.majorStride == dm.rows,
+ "Do not support stride size different from the number of rows.")
+ new DenseMatrix(dm.rows, dm.cols, dm.data)
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Do not support conversion from type ${breeze.getClass.getName}.")
+ }
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
deleted file mode 100644
index 319f82b449..0000000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-/**
- * Class that represents the SV decomposition of a matrix
- *
- * @param U such that A = USV^T
- * @param S such that A = USV^T
- * @param V such that A = USV^T
- */
-case class MatrixSVD(val U: SparseMatrix,
- val S: SparseMatrix,
- val V: SparseMatrix)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
deleted file mode 100644
index fe5b3f6c7e..0000000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import org.apache.spark.rdd.RDD
-
-
-import org.jblas.DoubleMatrix
-
-
-/**
- * Class used to obtain principal components
- */
-class PCA {
- private var k = 1
-
- /**
- * Set the number of top-k principle components to return
- */
- def setK(k: Int): PCA = {
- this.k = k
- this
- }
-
- /**
- * Compute PCA using the current set parameters
- */
- def compute(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = {
- computePCA(matrix)
- }
-
- /**
- * Compute PCA using the parameters currently set
- * See computePCA() for more details
- */
- def compute(matrix: RDD[Array[Double]]): Array[Array[Double]] = {
- computePCA(matrix)
- }
-
- /**
- * Computes the top k principal component coefficients for the m-by-n data matrix X.
- * Rows of X correspond to observations and columns correspond to variables.
- * The coefficient matrix is n-by-k. Each column of coeff contains coefficients
- * for one principal component, and the columns are in descending
- * order of component variance.
- * This function centers the data and uses the
- * singular value decomposition (SVD) algorithm.
- *
- * @param matrix dense matrix to perform PCA on
- * @return An nxk matrix with principal components in columns. Columns are inner arrays
- */
- private def computePCA(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = {
- val m = matrix.m
- val n = matrix.n
-
- if (m <= 0 || n <= 0) {
- throw new IllegalArgumentException("Expecting a well-formed matrix: m=$m n=$n")
- }
-
- computePCA(matrix.rows.map(_.data))
- }
-
- /**
- * Computes the top k principal component coefficients for the m-by-n data matrix X.
- * Rows of X correspond to observations and columns correspond to variables.
- * The coefficient matrix is n-by-k. Each column of coeff contains coefficients
- * for one principal component, and the columns are in descending
- * order of component variance.
- * This function centers the data and uses the
- * singular value decomposition (SVD) algorithm.
- *
- * @param matrix dense matrix to perform pca on
- * @return An nxk matrix of principal components
- */
- private def computePCA(matrix: RDD[Array[Double]]): Array[Array[Double]] = {
- val n = matrix.first.size
-
- // compute column sums and normalize matrix
- val colSumsTemp = matrix.map((_, 1)).fold((Array.ofDim[Double](n), 0)) {
- (a, b) =>
- val am = new DoubleMatrix(a._1)
- val bm = new DoubleMatrix(b._1)
- am.addi(bm)
- (a._1, a._2 + b._2)
- }
-
- val m = colSumsTemp._2
- val colSums = colSumsTemp._1.map(x => x / m)
-
- val data = matrix.map {
- x =>
- val row = Array.ofDim[Double](n)
- var i = 0
- while (i < n) {
- row(i) = x(i) - colSums(i)
- i += 1
- }
- row
- }
-
- val (u, s, v) = new SVD().setK(k).compute(data)
- v
- }
-}
-
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
deleted file mode 100644
index 0d97b7d92f..0000000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
-
-/**
- * Class used to obtain singular value decompositions
- */
-class SVD {
- private var k = 1
- private var computeU = true
-
- // All singular values smaller than rCond * sigma(0)
- // are treated as zero, where sigma(0) is the largest singular value.
- private var rCond = 1e-9
-
- /**
- * Set the number of top-k singular vectors to return
- */
- def setK(k: Int): SVD = {
- this.k = k
- this
- }
-
- /**
- * Sets the reciprocal condition number (rCond). All singular values
- * smaller than rCond * sigma(0) are treated as zero,
- * where sigma(0) is the largest singular value.
- */
- def setReciprocalConditionNumber(smallS: Double): SVD = {
- this.rCond = smallS
- this
- }
-
- /**
- * Should U be computed?
- */
- def setComputeU(compU: Boolean): SVD = {
- this.computeU = compU
- this
- }
-
- /**
- * Compute SVD using the current set parameters
- */
- def compute(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = {
- denseSVD(matrix)
- }
-
- /**
- * Compute SVD using the current set parameters
- * Returns (U, S, V) such that A = USV^T
- * U is a row-by-row dense matrix
- * S is a simple double array of singular values
- * V is a 2d array matrix
- * See [[denseSVD]] for more documentation
- */
- def compute(matrix: RDD[Array[Double]]):
- (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
- denseSVD(matrix)
- }
-
- /**
- * See full paramter definition of sparseSVD for more description.
- *
- * @param matrix sparse matrix to factorize
- * @return Three sparse matrices: U, S, V such that A = USV^T
- */
- def compute(matrix: SparseMatrix): MatrixSVD = {
- sparseSVD(matrix)
- }
-
- /**
- * Singular Value Decomposition for Tall and Skinny matrices.
- * Given an m x n matrix A, this will compute matrices U, S, V such that
- * A = U * S * V'
- *
- * There is no restriction on m, but we require n^2 doubles to fit in memory.
- * Further, n should be less than m.
- *
- * The decomposition is computed by first computing A'A = V S^2 V',
- * computing svd locally on that (since n x n is small),
- * from which we recover S and V.
- * Then we compute U via easy matrix multiplication
- * as U = A * V * S^-1
- *
- * Only the k largest singular values and associated vectors are found.
- * If there are k such values, then the dimensions of the return will be:
- *
- * S is k x k and diagonal, holding the singular values on diagonal
- * U is m x k and satisfies U'U = eye(k)
- * V is n x k and satisfies V'V = eye(k)
- *
- * @param matrix dense matrix to factorize
- * @return See [[TallSkinnyMatrixSVD]] for the output matrices and arrays
- */
- private def denseSVD(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = {
- val m = matrix.m
- val n = matrix.n
-
- if (m < n || m <= 0 || n <= 0) {
- throw new IllegalArgumentException("Expecting a tall and skinny matrix m=$m n=$n")
- }
-
- if (k < 1 || k > n) {
- throw new IllegalArgumentException("Request up to n singular values n=$n k=$k")
- }
-
- val rowIndices = matrix.rows.map(_.i)
-
- // compute SVD
- val (u, sigma, v) = denseSVD(matrix.rows.map(_.data))
-
- if (computeU) {
- // prep u for returning
- val retU = TallSkinnyDenseMatrix(
- u.zip(rowIndices).map {
- case (row, i) => MatrixRow(i, row)
- },
- m,
- k)
-
- TallSkinnyMatrixSVD(retU, sigma, v)
- } else {
- TallSkinnyMatrixSVD(null, sigma, v)
- }
- }
-
- /**
- * Singular Value Decomposition for Tall and Skinny matrices.
- * Given an m x n matrix A, this will compute matrices U, S, V such that
- * A = U * S * V'
- *
- * There is no restriction on m, but we require n^2 doubles to fit in memory.
- * Further, n should be less than m.
- *
- * The decomposition is computed by first computing A'A = V S^2 V',
- * computing svd locally on that (since n x n is small),
- * from which we recover S and V.
- * Then we compute U via easy matrix multiplication
- * as U = A * V * S^-1
- *
- * Only the k largest singular values and associated vectors are found.
- * If there are k such values, then the dimensions of the return will be:
- *
- * S is k x k and diagonal, holding the singular values on diagonal
- * U is m x k and satisfies U'U = eye(k)
- * V is n x k and satisfies V'V = eye(k)
- *
- * The return values are as lean as possible: an RDD of rows for U,
- * a simple array for sigma, and a dense 2d matrix array for V
- *
- * @param matrix dense matrix to factorize
- * @return Three matrices: U, S, V such that A = USV^T
- */
- private def denseSVD(matrix: RDD[Array[Double]]):
- (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
- val n = matrix.first.size
-
- if (k < 1 || k > n) {
- throw new IllegalArgumentException(
- "Request up to n singular values k=$k n=$n")
- }
-
- // Compute A^T A
- val fullata = matrix.mapPartitions {
- iter =>
- val localATA = Array.ofDim[Double](n, n)
- while (iter.hasNext) {
- val row = iter.next()
- var i = 0
- while (i < n) {
- var j = 0
- while (j < n) {
- localATA(i)(j) += row(i) * row(j)
- j += 1
- }
- i += 1
- }
- }
- Iterator(localATA)
- }.fold(Array.ofDim[Double](n, n)) {
- (a, b) =>
- var i = 0
- while (i < n) {
- var j = 0
- while (j < n) {
- a(i)(j) += b(i)(j)
- j += 1
- }
- i += 1
- }
- a
- }
-
- // Construct jblas A^T A locally
- val ata = new DoubleMatrix(fullata)
-
- // Since A^T A is small, we can compute its SVD directly
- val svd = Singular.sparseSVD(ata)
- val V = svd(0)
- val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x / svd(1).get(0) > rCond)
-
- val sk = Math.min(k, sigmas.size)
- val sigma = sigmas.take(sk)
-
- // prepare V for returning
- val retV = Array.tabulate(n, sk)((i, j) => V.get(i, j))
-
- if (computeU) {
- // Compute U as U = A V S^-1
- // Compute VS^-1
- val vsinv = new DoubleMatrix(Array.tabulate(n, sk)((i, j) => V.get(i, j) / sigma(j)))
- val retU = matrix.map {
- x =>
- val v = new DoubleMatrix(Array(x))
- v.mmul(vsinv).data
- }
- (retU, sigma, retV)
- } else {
- (null, sigma, retV)
- }
- }
-
- /**
- * Singular Value Decomposition for Tall and Skinny sparse matrices.
- * Given an m x n matrix A, this will compute matrices U, S, V such that
- * A = U * S * V'
- *
- * There is no restriction on m, but we require O(n^2) doubles to fit in memory.
- * Further, n should be less than m.
- *
- * The decomposition is computed by first computing A'A = V S^2 V',
- * computing svd locally on that (since n x n is small),
- * from which we recover S and V.
- * Then we compute U via easy matrix multiplication
- * as U = A * V * S^-1
- *
- * Only the k largest singular values and associated vectors are found.
- * If there are k such values, then the dimensions of the return will be:
- *
- * S is k x k and diagonal, holding the singular values on diagonal
- * U is m x k and satisfies U'U = eye(k)
- * V is n x k and satisfies V'V = eye(k)
- *
- * All input and output is expected in sparse matrix format, 0-indexed
- * as tuples of the form ((i,j),value) all in RDDs using the
- * SparseMatrix class
- *
- * @param matrix sparse matrix to factorize
- * @return Three sparse matrices: U, S, V such that A = USV^T
- */
- private def sparseSVD(matrix: SparseMatrix): MatrixSVD = {
- val data = matrix.data
- val m = matrix.m
- val n = matrix.n
-
- if (m < n || m <= 0 || n <= 0) {
- throw new IllegalArgumentException("Expecting a tall and skinny matrix")
- }
-
- if (k < 1 || k > n) {
- throw new IllegalArgumentException("Must request up to n singular values")
- }
-
- // Compute A^T A, assuming rows are sparse enough to fit in memory
- val rows = data.map(entry =>
- (entry.i, (entry.j, entry.mval))).groupByKey()
- val emits = rows.flatMap {
- case (rowind, cols) =>
- cols.flatMap {
- case (colind1, mval1) =>
- cols.map {
- case (colind2, mval2) =>
- ((colind1, colind2), mval1 * mval2)
- }
- }
- }.reduceByKey(_ + _)
-
- // Construct jblas A^T A locally
- val ata = DoubleMatrix.zeros(n, n)
- for (entry <- emits.collect()) {
- ata.put(entry._1._1, entry._1._2, entry._2)
- }
-
- // Since A^T A is small, we can compute its SVD directly
- val svd = Singular.sparseSVD(ata)
- val V = svd(0)
- // This will be updated to rcond
- val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9)
-
- if (sigmas.size < k) {
- throw new Exception("Not enough singular values to return k=" + k + " s=" + sigmas.size)
- }
-
- val sigma = sigmas.take(k)
-
- val sc = data.sparkContext
-
- // prepare V for returning
- val retVdata = sc.makeRDD(
- Array.tabulate(V.rows, sigma.length) {
- (i, j) =>
- MatrixEntry(i, j, V.get(i, j))
- }.flatten)
- val retV = SparseMatrix(retVdata, V.rows, sigma.length)
-
- val retSdata = sc.makeRDD(Array.tabulate(sigma.length) {
- x => MatrixEntry(x, x, sigma(x))
- })
-
- val retS = SparseMatrix(retSdata, sigma.length, sigma.length)
-
- // Compute U as U = A V S^-1
- // turn V S^-1 into an RDD as a sparse matrix
- val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) {
- (i, j) => ((i, j), V.get(i, j) / sigma(j))
- }.flatten)
-
- if (computeU) {
- // Multiply A by VS^-1
- val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
- val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
- val retUdata = aCols.join(bRows).map {
- case (key, ((rowInd, rowVal), (colInd, colVal))) =>
- ((rowInd, colInd), rowVal * colVal)
- }.reduceByKey(_ + _).map {
- case ((row, col), mval) => MatrixEntry(row, col, mval)
- }
-
- val retU = SparseMatrix(retUdata, m, sigma.length)
- MatrixSVD(retU, retS, retV)
- } else {
- MatrixSVD(null, retS, retV)
- }
- }
-}
-
-/**
- * Top-level methods for calling sparse Singular Value Decomposition
- * NOTE: All matrices are 0-indexed
- */
-object SVD {
- def main(args: Array[String]) {
- if (args.length < 8) {
- println("Usage: SVD <master> <matrix_file> <m> <n> " +
- "<k> <output_U_file> <output_S_file> <output_V_file>")
- System.exit(1)
- }
-
- val (master, inputFile, m, n, k, output_u, output_s, output_v) =
- (args(0), args(1), args(2).toInt, args(3).toInt,
- args(4).toInt, args(5), args(6), args(7))
-
- val sc = new SparkContext(master, "SVD")
-
- val rawData = sc.textFile(inputFile)
- val data = rawData.map {
- line =>
- val parts = line.split(',')
- MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
- }
-
- val decomposed = new SVD().setK(k).compute(SparseMatrix(data, m, n))
- val u = decomposed.U.data
- val s = decomposed.S.data
- val v = decomposed.V.data
-
- println("Computed " + s.collect().length + " singular values and vectors")
- u.saveAsTextFile(output_u)
- s.saveAsTextFile(output_s)
- v.saveAsTextFile(output_v)
- System.exit(0)
- }
-}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala
index 2608a67bfe..46b1054574 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixRow.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SingularValueDecomposition.scala
@@ -17,10 +17,5 @@
package org.apache.spark.mllib.linalg
-/**
- * Class that represents a row of a dense matrix
- *
- * @param i row index (0 indexing used)
- * @param data entries of the row
- */
-case class MatrixRow(val i: Int, val data: Array[Double])
+/** Represents singular value decomposition (SVD) factors. */
+case class SingularValueDecomposition[UType, VType](U: UType, s: Vector, V: VType)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala
deleted file mode 100644
index b3a450e923..0000000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyMatrixSVD.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-/**
- * Class that represents the singular value decomposition of a matrix
- *
- * @param U such that A = USV^T is a TallSkinnyDenseMatrix
- * @param S such that A = USV^T is a simple double array
- * @param V such that A = USV^T, V is a 2d array matrix that holds
- * singular vectors in columns. Columns are inner arrays
- * i.e. V(i)(j) is standard math notation V_{ij}
- */
-case class TallSkinnyMatrixSVD(val U: TallSkinnyDenseMatrix,
- val S: Array[Double],
- val V: Array[Array[Double]])
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
new file mode 100644
index 0000000000..9194f65749
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vectors
+
+/**
+ * Represents an entry in an distributed matrix.
+ * @param i row index
+ * @param j column index
+ * @param value value of the entry
+ */
+case class MatrixEntry(i: Long, j: Long, value: Double)
+
+/**
+ * Represents a matrix in coordinate format.
+ *
+ * @param entries matrix entries
+ * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
+ * be determined by the max row index plus one.
+ * @param nCols number of columns. A non-positive value means unknown, and then the number of
+ * columns will be determined by the max column index plus one.
+ */
+class CoordinateMatrix(
+ val entries: RDD[MatrixEntry],
+ private var nRows: Long,
+ private var nCols: Long) extends DistributedMatrix {
+
+ /** Alternative constructor leaving matrix dimensions to be determined automatically. */
+ def this(entries: RDD[MatrixEntry]) = this(entries, 0L, 0L)
+
+ /** Gets or computes the number of columns. */
+ override def numCols(): Long = {
+ if (nCols <= 0L) {
+ computeSize()
+ }
+ nCols
+ }
+
+ /** Gets or computes the number of rows. */
+ override def numRows(): Long = {
+ if (nRows <= 0L) {
+ computeSize()
+ }
+ nRows
+ }
+
+ /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */
+ def toIndexedRowMatrix(): IndexedRowMatrix = {
+ val nl = numCols()
+ if (nl > Int.MaxValue) {
+ sys.error(s"Cannot convert to a row-oriented format because the number of columns $nl is " +
+ "too large.")
+ }
+ val n = nl.toInt
+ val indexedRows = entries.map(entry => (entry.i, (entry.j.toInt, entry.value)))
+ .groupByKey()
+ .map { case (i, vectorEntries) =>
+ IndexedRow(i, Vectors.sparse(n, vectorEntries.toSeq))
+ }
+ new IndexedRowMatrix(indexedRows, numRows(), n)
+ }
+
+ /**
+ * Converts to RowMatrix, dropping row indices after grouping by row index.
+ * The number of columns must be within the integer range.
+ */
+ def toRowMatrix(): RowMatrix = {
+ toIndexedRowMatrix().toRowMatrix()
+ }
+
+ /** Determines the size by computing the max row/column index. */
+ private def computeSize() {
+ // Reduce will throw an exception if `entries` is empty.
+ val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) =>
+ (math.max(i1, i2), math.max(j1, j2))
+ }
+ // There may be empty columns at the very right and empty rows at the very bottom.
+ nRows = math.max(nRows, m1 + 1L)
+ nCols = math.max(nCols, n1 + 1L)
+ }
+
+ /** Collects data and assembles a local matrix. */
+ private[mllib] override def toBreeze(): BDM[Double] = {
+ val m = numRows().toInt
+ val n = numCols().toInt
+ val mat = BDM.zeros[Double](m, n)
+ entries.collect().foreach { case MatrixEntry(i, j, value) =>
+ mat(i.toInt, j.toInt) = value
+ }
+ mat
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala
index cbd1a2a5a4..13f72a3c72 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala
@@ -15,16 +15,23 @@
* limitations under the License.
*/
-package org.apache.spark.mllib.linalg
+package org.apache.spark.mllib.linalg.distributed
-import org.apache.spark.rdd.RDD
+import breeze.linalg.{DenseMatrix => BDM}
+import org.apache.spark.mllib.linalg.Matrix
/**
- * Class that represents a sparse matrix
- *
- * @param data RDD of nonzero entries
- * @param m number of rows
- * @param n numner of columns
+ * Represents a distributively stored matrix backed by one or more RDDs.
*/
-case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int)
+trait DistributedMatrix extends Serializable {
+
+ /** Gets or computes the number of rows. */
+ def numRows(): Long
+
+ /** Gets or computes the number of columns. */
+ def numCols(): Long
+
+ /** Collects data and assembles a local dense breeze matrix (for test only). */
+ private[mllib] def toBreeze(): BDM[Double]
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
new file mode 100644
index 0000000000..e110f070bd
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.linalg.SingularValueDecomposition
+
+/** Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]]. */
+case class IndexedRow(index: Long, vector: Vector)
+
+/**
+ * Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with
+ * indexed rows.
+ *
+ * @param rows indexed rows of this matrix
+ * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
+ * be determined by the max row index plus one.
+ * @param nCols number of columns. A non-positive value means unknown, and then the number of
+ * columns will be determined by the size of the first row.
+ */
+class IndexedRowMatrix(
+ val rows: RDD[IndexedRow],
+ private var nRows: Long,
+ private var nCols: Int) extends DistributedMatrix {
+
+ /** Alternative constructor leaving matrix dimensions to be determined automatically. */
+ def this(rows: RDD[IndexedRow]) = this(rows, 0L, 0)
+
+ override def numCols(): Long = {
+ if (nCols <= 0) {
+ // Calling `first` will throw an exception if `rows` is empty.
+ nCols = rows.first().vector.size
+ }
+ nCols
+ }
+
+ override def numRows(): Long = {
+ if (nRows <= 0L) {
+ // Reduce will throw an exception if `rows` is empty.
+ nRows = rows.map(_.index).reduce(math.max) + 1L
+ }
+ nRows
+ }
+
+ /**
+ * Drops row indices and converts this matrix to a
+ * [[org.apache.spark.mllib.linalg.distributed.RowMatrix]].
+ */
+ def toRowMatrix(): RowMatrix = {
+ new RowMatrix(rows.map(_.vector), 0L, nCols)
+ }
+
+ /**
+ * Computes the singular value decomposition of this matrix.
+ * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
+ *
+ * There is no restriction on m, but we require `n^2` doubles to fit in memory.
+ * Further, n should be less than m.
+
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small), from which we recover S and V.
+ * Then we compute U via easy matrix multiplication as U = A * (V * S^-1).
+ * Note that this approach requires `O(n^3)` time on the master node.
+ *
+ * At most k largest non-zero singular values and associated vectors are returned.
+ * If there are k such values, then the dimensions of the return will be:
+ *
+ * U is an [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]] of size m x k that
+ * satisfies U'U = eye(k),
+ * s is a Vector of size k, holding the singular values in descending order,
+ * and V is a local Matrix of size n x k that satisfies V'V = eye(k).
+ *
+ * @param k number of singular values to keep. We might return less than k if there are
+ * numerically zero singular values. See rCond.
+ * @param computeU whether to compute U
+ * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
+ * are treated as zero, where sigma(0) is the largest singular value.
+ * @return SingularValueDecomposition(U, s, V)
+ */
+ def computeSVD(
+ k: Int,
+ computeU: Boolean = false,
+ rCond: Double = 1e-9): SingularValueDecomposition[IndexedRowMatrix, Matrix] = {
+ val indices = rows.map(_.index)
+ val svd = toRowMatrix().computeSVD(k, computeU, rCond)
+ val U = if (computeU) {
+ val indexedRows = indices.zip(svd.U.rows).map { case (i, v) =>
+ IndexedRow(i, v)
+ }
+ new IndexedRowMatrix(indexedRows, nRows, nCols)
+ } else {
+ null
+ }
+ SingularValueDecomposition(U, svd.s, svd.V)
+ }
+
+ /**
+ * Multiply this matrix by a local matrix on the right.
+ *
+ * @param B a local matrix whose number of rows must match the number of columns of this matrix
+ * @return an IndexedRowMatrix representing the product, which preserves partitioning
+ */
+ def multiply(B: Matrix): IndexedRowMatrix = {
+ val mat = toRowMatrix().multiply(B)
+ val indexedRows = rows.map(_.index).zip(mat.rows).map { case (i, v) =>
+ IndexedRow(i, v)
+ }
+ new IndexedRowMatrix(indexedRows, nRows, nCols)
+ }
+
+ /**
+ * Computes the Gramian matrix `A^T A`.
+ */
+ def computeGramianMatrix(): Matrix = {
+ toRowMatrix().computeGramianMatrix()
+ }
+
+ private[mllib] override def toBreeze(): BDM[Double] = {
+ val m = numRows().toInt
+ val n = numCols().toInt
+ val mat = BDM.zeros[Double](m, n)
+ rows.collect().foreach { case IndexedRow(rowIndex, vector) =>
+ val i = rowIndex.toInt
+ vector.toBreeze.activeIterator.foreach { case (j, v) =>
+ mat(i, j) = v
+ }
+ }
+ mat
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
new file mode 100644
index 0000000000..f59811f18a
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import java.util
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
+import breeze.numerics.{sqrt => brzSqrt}
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
+
+import org.apache.spark.mllib.linalg._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.Logging
+
+/**
+ * Represents a row-oriented distributed Matrix with no meaningful row indices.
+ *
+ * @param rows rows stored as an RDD[Vector]
+ * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
+ * be determined by the number of records in the RDD `rows`.
+ * @param nCols number of columns. A non-positive value means unknown, and then the number of
+ * columns will be determined by the size of the first row.
+ */
+class RowMatrix(
+ val rows: RDD[Vector],
+ private var nRows: Long,
+ private var nCols: Int) extends DistributedMatrix with Logging {
+
+ /** Alternative constructor leaving matrix dimensions to be determined automatically. */
+ def this(rows: RDD[Vector]) = this(rows, 0L, 0)
+
+ /** Gets or computes the number of columns. */
+ override def numCols(): Long = {
+ if (nCols <= 0) {
+ // Calling `first` will throw an exception if `rows` is empty.
+ nCols = rows.first().size
+ }
+ nCols
+ }
+
+ /** Gets or computes the number of rows. */
+ override def numRows(): Long = {
+ if (nRows <= 0L) {
+ nRows = rows.count()
+ if (nRows == 0L) {
+ sys.error("Cannot determine the number of rows because it is not specified in the " +
+ "constructor and the rows RDD is empty.")
+ }
+ }
+ nRows
+ }
+
+ /**
+ * Computes the Gramian matrix `A^T A`.
+ */
+ def computeGramianMatrix(): Matrix = {
+ val n = numCols().toInt
+ val nt: Int = n * (n + 1) / 2
+
+ // Compute the upper triangular part of the gram matrix.
+ val GU = rows.aggregate(new BDV[Double](new Array[Double](nt)))(
+ seqOp = (U, v) => {
+ RowMatrix.dspr(1.0, v, U.data)
+ U
+ },
+ combOp = (U1, U2) => U1 += U2
+ )
+
+ RowMatrix.triuToFull(n, GU.data)
+ }
+
+ /**
+ * Computes the singular value decomposition of this matrix.
+ * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
+ *
+ * There is no restriction on m, but we require `n^2` doubles to fit in memory.
+ * Further, n should be less than m.
+
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small), from which we recover S and V.
+ * Then we compute U via easy matrix multiplication as U = A * (V * S^-1).
+ * Note that this approach requires `O(n^3)` time on the master node.
+ *
+ * At most k largest non-zero singular values and associated vectors are returned.
+ * If there are k such values, then the dimensions of the return will be:
+ *
+ * U is a RowMatrix of size m x k that satisfies U'U = eye(k),
+ * s is a Vector of size k, holding the singular values in descending order,
+ * and V is a Matrix of size n x k that satisfies V'V = eye(k).
+ *
+ * @param k number of singular values to keep. We might return less than k if there are
+ * numerically zero singular values. See rCond.
+ * @param computeU whether to compute U
+ * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
+ * are treated as zero, where sigma(0) is the largest singular value.
+ * @return SingularValueDecomposition(U, s, V)
+ */
+ def computeSVD(
+ k: Int,
+ computeU: Boolean = false,
+ rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
+ val n = numCols().toInt
+ require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
+
+ val G = computeGramianMatrix()
+
+ // TODO: Use sparse SVD instead.
+ val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) =
+ brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])
+ val sigmas: BDV[Double] = brzSqrt(sigmaSquares)
+
+ // Determine effective rank.
+ val sigma0 = sigmas(0)
+ val threshold = rCond * sigma0
+ var i = 0
+ while (i < k && sigmas(i) >= threshold) {
+ i += 1
+ }
+ val sk = i
+
+ if (sk < k) {
+ logWarning(s"Requested $k singular values but only found $sk nonzeros.")
+ }
+
+ val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk))
+ val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+
+ if (computeU) {
+ // N = Vk * Sk^{-1}
+ val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+ var i = 0
+ var j = 0
+ while (j < sk) {
+ i = 0
+ val sigma = sigmas(j)
+ while (i < n) {
+ N(i, j) /= sigma
+ i += 1
+ }
+ j += 1
+ }
+ val U = this.multiply(Matrices.fromBreeze(N))
+ SingularValueDecomposition(U, s, V)
+ } else {
+ SingularValueDecomposition(null, s, V)
+ }
+ }
+
+ /**
+ * Computes the covariance matrix, treating each row as an observation.
+ * @return a local dense matrix of size n x n
+ */
+ def computeCovariance(): Matrix = {
+ val n = numCols().toInt
+
+ if (n > 10000) {
+ val mem = n * n * java.lang.Double.SIZE / java.lang.Byte.SIZE
+ logWarning(s"The number of columns $n is greater than 10000! " +
+ s"We need at least $mem bytes of memory.")
+ }
+
+ val (m, mean) = rows.aggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))(
+ seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze),
+ combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + s2._1, s1._2 += s2._2)
+ )
+
+ // Update _m if it is not set, or verify its value.
+ if (nRows <= 0L) {
+ nRows = m
+ } else {
+ require(nRows == m,
+ s"The number of rows $m is different from what specified or previously computed: ${nRows}.")
+ }
+
+ mean :/= m.toDouble
+
+ // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is
+ // large but Cov(X, Y) is small, but it is good for sparse computation.
+ // TODO: find a fast and stable way for sparse data.
+
+ val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+
+ var i = 0
+ var j = 0
+ val m1 = m - 1.0
+ var alpha = 0.0
+ while (i < n) {
+ alpha = m / m1 * mean(i)
+ j = 0
+ while (j < n) {
+ G(i, j) = G(i, j) / m1 - alpha * mean(j)
+ j += 1
+ }
+ i += 1
+ }
+
+ Matrices.fromBreeze(G)
+ }
+
+ /**
+ * Computes the top k principal components.
+ * Rows correspond to observations and columns correspond to variables.
+ * The principal components are stored a local matrix of size n-by-k.
+ * Each column corresponds for one principal component,
+ * and the columns are in descending order of component variance.
+ *
+ * @param k number of top principal components.
+ * @return a matrix of size n-by-k, whose columns are principal components
+ */
+ def computePrincipalComponents(k: Int): Matrix = {
+ val n = numCols().toInt
+ require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")
+
+ val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]]
+
+ val (u: BDM[Double], _, _) = brzSvd(Cov)
+
+ if (k == n) {
+ Matrices.dense(n, k, u.data)
+ } else {
+ Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k))
+ }
+ }
+
+ /**
+ * Multiply this matrix by a local matrix on the right.
+ *
+ * @param B a local matrix whose number of rows must match the number of columns of this matrix
+ * @return a [[org.apache.spark.mllib.linalg.distributed.RowMatrix]] representing the product,
+ * which preserves partitioning
+ */
+ def multiply(B: Matrix): RowMatrix = {
+ val n = numCols().toInt
+ require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}")
+
+ require(B.isInstanceOf[DenseMatrix],
+ s"Only support dense matrix at this time but found ${B.getClass.getName}.")
+
+ val Bb = rows.context.broadcast(B)
+ val AB = rows.mapPartitions({ iter =>
+ val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]]
+ iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze))
+ }, preservesPartitioning = true)
+
+ new RowMatrix(AB, nRows, B.numCols)
+ }
+
+ private[mllib] override def toBreeze(): BDM[Double] = {
+ val m = numRows().toInt
+ val n = numCols().toInt
+ val mat = BDM.zeros[Double](m, n)
+ var i = 0
+ rows.collect().foreach { v =>
+ v.toBreeze.activeIterator.foreach { case (j, v) =>
+ mat(i, j) = v
+ }
+ i += 1
+ }
+ mat
+ }
+}
+
+object RowMatrix {
+
+ /**
+ * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR.
+ *
+ * @param U the upper triangular part of the matrix packed in an array (column major)
+ */
+ private def dspr(alpha: Double, v: Vector, U: Array[Double]): Unit = {
+ // TODO: Find a better home (breeze?) for this method.
+ val n = v.size
+ v match {
+ case dv: DenseVector =>
+ blas.dspr("U", n, 1.0, dv.values, 1, U)
+ case sv: SparseVector =>
+ val indices = sv.indices
+ val values = sv.values
+ val nnz = indices.length
+ var colStartIdx = 0
+ var prevCol = 0
+ var col = 0
+ var j = 0
+ var i = 0
+ var av = 0.0
+ while (j < nnz) {
+ col = indices(j)
+ // Skip empty columns.
+ colStartIdx += (col - prevCol) * (col + prevCol + 1) / 2
+ col = indices(j)
+ av = alpha * values(j)
+ i = 0
+ while (i <= j) {
+ U(colStartIdx + indices(i)) += av * values(i)
+ i += 1
+ }
+ j += 1
+ prevCol = col
+ }
+ }
+ }
+
+ /**
+ * Fills a full square matrix from its upper triangular part.
+ */
+ private def triuToFull(n: Int, U: Array[Double]): Matrix = {
+ val G = new BDM[Double](n, n)
+
+ var row = 0
+ var col = 0
+ var idx = 0
+ var value = 0.0
+ while (col < n) {
+ row = 0
+ while (row < col) {
+ value = U(idx)
+ G(row, col) = value
+ G(col, row) = value
+ idx += 1
+ row += 1
+ }
+ G(col, col) = U(idx)
+ idx += 1
+ col +=1
+ }
+
+ Matrices.dense(n, n, G.data)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
deleted file mode 100644
index 87aac34757..0000000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.util
-
-import org.apache.spark.SparkContext._
-
-import org.apache.spark.mllib.linalg._
-
-/**
- * Helper methods for linear algebra
- */
-object LAUtils {
- /**
- * Convert a SparseMatrix into a TallSkinnyDenseMatrix
- *
- * @param sp Sparse matrix to be converted
- * @return dense version of the input
- */
- def sparseToTallSkinnyDense(sp: SparseMatrix): TallSkinnyDenseMatrix = {
- val m = sp.m
- val n = sp.n
- val rows = sp.data.map(x => (x.i, (x.j, x.mval))).groupByKey().map {
- case (i, cols) =>
- val rowArray = Array.ofDim[Double](n)
- var j = 0
- val colsItr = cols.iterator
- while (colsItr.hasNext) {
- val element = colsItr.next
- rowArray(element._1) = element._2
- j += 1
- }
- MatrixRow(i, rowArray)
- }
- TallSkinnyDenseMatrix(rows, m, n)
- }
-
- /**
- * Convert a TallSkinnyDenseMatrix to a SparseMatrix
- *
- * @param a matrix to be converted
- * @return sparse version of the input
- */
- def denseToSparse(a: TallSkinnyDenseMatrix): SparseMatrix = {
- val m = a.m
- val n = a.n
- val data = a.rows.flatMap {
- mrow => Array.tabulate(n)(j => MatrixEntry(mrow.i, j, mrow.data(j)))
- .filter(x => x.mval != 0)
- }
- SparseMatrix(data, m, n)
- }
-}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala
index 416996fcbe..82d49c76ed 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala
@@ -17,11 +17,24 @@
package org.apache.spark.mllib.linalg
-/**
- * Class that represents an entry in a sparse matrix of doubles.
- *
- * @param i row index (0 indexing used)
- * @param j column index (0 indexing used)
- * @param mval value of entry in matrix
- */
-case class MatrixEntry(val i: Int, val j: Int, val mval: Double)
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+class BreezeMatrixConversionSuite extends FunSuite {
+ test("dense matrix to breeze") {
+ val mat = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+ val breeze = mat.toBreeze.asInstanceOf[BDM[Double]]
+ assert(breeze.rows === mat.numRows)
+ assert(breeze.cols === mat.numCols)
+ assert(breeze.data.eq(mat.asInstanceOf[DenseMatrix].values), "should not copy data")
+ }
+
+ test("dense breeze matrix to matrix") {
+ val breeze = new BDM[Double](3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+ val mat = Matrices.fromBreeze(breeze).asInstanceOf[DenseMatrix]
+ assert(mat.numRows === breeze.rows)
+ assert(mat.numCols === breeze.cols)
+ assert(mat.values.eq(breeze.data), "should not copy data")
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
index e4ef3c58e8..9c66b4db9f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/TallSkinnyDenseMatrix.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
@@ -17,14 +17,23 @@
package org.apache.spark.mllib.linalg
-import org.apache.spark.rdd.RDD
+import org.scalatest.FunSuite
+class MatricesSuite extends FunSuite {
+ test("dense matrix construction") {
+ val m = 3
+ val n = 2
+ val values = Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)
+ val mat = Matrices.dense(m, n, values).asInstanceOf[DenseMatrix]
+ assert(mat.numRows === m)
+ assert(mat.numCols === n)
+ assert(mat.values.eq(values), "should not copy data")
+ assert(mat.toArray.eq(values), "toArray should not copy data")
+ }
-/**
- * Class that represents a dense matrix
- *
- * @param rows RDD of rows
- * @param m number of rows
- * @param n number of columns
- */
-case class TallSkinnyDenseMatrix(val rows: RDD[MatrixRow], val m: Int, val n: Int)
+ test("dense matrix construction with wrong dimension") {
+ intercept[RuntimeException] {
+ Matrices.dense(3, 2, Array(0.0, 1.0, 2.0))
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
deleted file mode 100644
index 5e5086b1bf..0000000000
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import scala.util.Random
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.mllib.util._
-
-import org.jblas._
-
-class PCASuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
-
- val EPSILON = 1e-3
-
- // Return jblas matrix from sparse matrix RDD
- def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
- val data = matrix.data
- val ret = DoubleMatrix.zeros(matrix.m, matrix.n)
- matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval))
- ret
- }
-
- def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) {
- assert(a.rows == b.rows && a.columns == b.columns,
- "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns")
- for (i <- 0 until a.columns) {
- val aCol = a.getColumn(i)
- val bCol = b.getColumn(i)
- val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1)
- assert(diff < EPSILON, "matrix mismatch: " + diff)
- }
- }
-
- test("full rank matrix pca") {
- val m = 5
- val n = 3
- val dataArr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten
- val data = sc.makeRDD(dataArr, 3)
- val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
- val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054),
- (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474),
- (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886))
- val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)), 3)
-
- val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))
-
- assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
- }
-
- test("sparse matrix full rank matrix pca") {
- val m = 5
- val n = 3
- // the entry that gets dropped is zero to test sparse support
- val dataArr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten.drop(1)
- val data = sc.makeRDD(dataArr, 3)
- val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
- val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054),
- (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474),
- (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886))
- val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))
-
- val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))
-
- assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
- }
-
- test("truncated matrix pca") {
- val m = 5
- val n = 3
- val dataArr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten
-
- val data = sc.makeRDD(dataArr, 3)
- val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
- val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602),
- (1,0,-0.1448), (1,1,0.7483),
- (2,0,0.9553), (2,1,-0.0649))
- val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))
-
- val k = 2
- val coeffs = new DoubleMatrix(new PCA().setK(k).compute(a))
-
- assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,k)), coeffs)
- }
-}
-
-
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
deleted file mode 100644
index 20e2b0f84b..0000000000
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import scala.util.Random
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-
-import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.mllib.util._
-
-import org.jblas._
-
-class SVDSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
-
- val EPSILON = 1e-4
-
- // Return jblas matrix from sparse matrix RDD
- def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
- val data = matrix.data
- val m = matrix.m
- val n = matrix.n
- val ret = DoubleMatrix.zeros(m, n)
- matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval))
- ret
- }
-
- def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) {
- assert(a.rows == b.rows && a.columns == b.columns,
- "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns")
- for (i <- 0 until a.columns) {
- val aCol = a.getColumn(i)
- val bCol = b.getColumn(i)
- val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1)
- assert(diff < EPSILON, "matrix mismatch: " + diff)
- }
- }
-
- test("full rank matrix svd") {
- val m = 10
- val n = 3
- val datarr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten
- val data = sc.makeRDD(datarr, 3)
-
- val a = SparseMatrix(data, m, n)
-
- val decomposed = new SVD().setK(n).compute(a)
- val u = decomposed.U
- val v = decomposed.V
- val s = decomposed.S
-
- val denseA = getDenseMatrix(a)
- val svd = Singular.sparseSVD(denseA)
-
- val retu = getDenseMatrix(u)
- val rets = getDenseMatrix(s)
- val retv = getDenseMatrix(v)
-
-
- // check individual decomposition
- assertMatrixApproximatelyEquals(retu, svd(0))
- assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1)))
- assertMatrixApproximatelyEquals(retv, svd(2))
-
- // check multiplication guarantee
- assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
- }
-
- test("dense full rank matrix svd") {
- val m = 10
- val n = 3
- val datarr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten
- val data = sc.makeRDD(datarr, 3)
-
- val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
- val decomposed = new SVD().setK(n).setComputeU(true).compute(a)
- val u = LAUtils.denseToSparse(decomposed.U)
- val v = decomposed.V
- val s = decomposed.S
-
- val denseA = getDenseMatrix(LAUtils.denseToSparse(a))
- val svd = Singular.sparseSVD(denseA)
-
- val retu = getDenseMatrix(u)
- val rets = DoubleMatrix.diag(new DoubleMatrix(s))
- val retv = new DoubleMatrix(v)
-
-
- // check individual decomposition
- assertMatrixApproximatelyEquals(retu, svd(0))
- assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1)))
- assertMatrixApproximatelyEquals(retv, svd(2))
-
- // check multiplication guarantee
- assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
- }
-
- test("rank one matrix svd") {
- val m = 10
- val n = 3
- val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) =>
- MatrixEntry(a, b, 1.0) }.flatten )
- val k = 1
-
- val a = SparseMatrix(data, m, n)
-
- val decomposed = new SVD().setK(k).compute(a)
- val u = decomposed.U
- val s = decomposed.S
- val v = decomposed.V
- val retrank = s.data.collect().length
-
- assert(retrank == 1, "rank returned not one")
-
- val denseA = getDenseMatrix(a)
- val svd = Singular.sparseSVD(denseA)
-
- val retu = getDenseMatrix(u)
- val rets = getDenseMatrix(s)
- val retv = getDenseMatrix(v)
-
- // check individual decomposition
- assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0))
- assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
- assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0))
-
- // check multiplication guarantee
- assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
- }
-
- test("truncated with k") {
- val m = 10
- val n = 3
- val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten )
- val a = SparseMatrix(data, m, n)
-
- val k = 1 // only one svalue above this
-
- val decomposed = new SVD().setK(k).compute(a)
- val u = decomposed.U
- val s = decomposed.S
- val v = decomposed.V
- val retrank = s.data.collect().length
-
- val denseA = getDenseMatrix(a)
- val svd = Singular.sparseSVD(denseA)
-
- val retu = getDenseMatrix(u)
- val rets = getDenseMatrix(s)
- val retv = getDenseMatrix(v)
-
- assert(retrank == 1, "rank returned not one")
-
- // check individual decomposition
- assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0))
- assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
- assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0))
- }
-}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
new file mode 100644
index 0000000000..cd45438fb6
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
+
+class CoordinateMatrixSuite extends FunSuite with LocalSparkContext {
+
+ val m = 5
+ val n = 4
+ var mat: CoordinateMatrix = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ val entries = sc.parallelize(Seq(
+ (0, 0, 1.0),
+ (0, 1, 2.0),
+ (1, 1, 3.0),
+ (1, 2, 4.0),
+ (2, 2, 5.0),
+ (2, 3, 6.0),
+ (3, 0, 7.0),
+ (3, 3, 8.0),
+ (4, 1, 9.0)), 3).map { case (i, j, value) =>
+ MatrixEntry(i, j, value)
+ }
+ mat = new CoordinateMatrix(entries)
+ }
+
+ test("size") {
+ assert(mat.numRows() === m)
+ assert(mat.numCols() === n)
+ }
+
+ test("empty entries") {
+ val entries = sc.parallelize(Seq[MatrixEntry](), 1)
+ val emptyMat = new CoordinateMatrix(entries)
+ intercept[RuntimeException] {
+ emptyMat.numCols()
+ }
+ intercept[RuntimeException] {
+ emptyMat.numRows()
+ }
+ }
+
+ test("toBreeze") {
+ val expected = BDM(
+ (1.0, 2.0, 0.0, 0.0),
+ (0.0, 3.0, 4.0, 0.0),
+ (0.0, 0.0, 5.0, 6.0),
+ (7.0, 0.0, 0.0, 8.0),
+ (0.0, 9.0, 0.0, 0.0))
+ assert(mat.toBreeze() === expected)
+ }
+
+ test("toIndexedRowMatrix") {
+ val indexedRowMatrix = mat.toIndexedRowMatrix()
+ val expected = BDM(
+ (1.0, 2.0, 0.0, 0.0),
+ (0.0, 3.0, 4.0, 0.0),
+ (0.0, 0.0, 5.0, 6.0),
+ (7.0, 0.0, 0.0, 8.0),
+ (0.0, 9.0, 0.0, 0.0))
+ assert(indexedRowMatrix.toBreeze() === expected)
+ }
+
+ test("toRowMatrix") {
+ val rowMatrix = mat.toRowMatrix()
+ val rows = rowMatrix.rows.collect().toSet
+ val expected = Set(
+ Vectors.dense(1.0, 2.0, 0.0, 0.0),
+ Vectors.dense(0.0, 3.0, 4.0, 0.0),
+ Vectors.dense(0.0, 0.0, 5.0, 6.0),
+ Vectors.dense(7.0, 0.0, 0.0, 8.0),
+ Vectors.dense(0.0, 9.0, 0.0, 0.0))
+ assert(rows === expected)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
new file mode 100644
index 0000000000..f7c46f23b7
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.{Matrices, Vectors}
+
+class IndexedRowMatrixSuite extends FunSuite with LocalSparkContext {
+
+ val m = 4
+ val n = 3
+ val data = Seq(
+ (0L, Vectors.dense(0.0, 1.0, 2.0)),
+ (1L, Vectors.dense(3.0, 4.0, 5.0)),
+ (3L, Vectors.dense(9.0, 0.0, 1.0))
+ ).map(x => IndexedRow(x._1, x._2))
+ var indexedRows: RDD[IndexedRow] = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ indexedRows = sc.parallelize(data, 2)
+ }
+
+ test("size") {
+ val mat1 = new IndexedRowMatrix(indexedRows)
+ assert(mat1.numRows() === m)
+ assert(mat1.numCols() === n)
+
+ val mat2 = new IndexedRowMatrix(indexedRows, 5, 0)
+ assert(mat2.numRows() === 5)
+ assert(mat2.numCols() === n)
+ }
+
+ test("empty rows") {
+ val rows = sc.parallelize(Seq[IndexedRow](), 1)
+ val mat = new IndexedRowMatrix(rows)
+ intercept[RuntimeException] {
+ mat.numRows()
+ }
+ intercept[RuntimeException] {
+ mat.numCols()
+ }
+ }
+
+ test("toBreeze") {
+ val mat = new IndexedRowMatrix(indexedRows)
+ val expected = BDM(
+ (0.0, 1.0, 2.0),
+ (3.0, 4.0, 5.0),
+ (0.0, 0.0, 0.0),
+ (9.0, 0.0, 1.0))
+ assert(mat.toBreeze() === expected)
+ }
+
+ test("toRowMatrix") {
+ val idxRowMat = new IndexedRowMatrix(indexedRows)
+ val rowMat = idxRowMat.toRowMatrix()
+ assert(rowMat.numCols() === n)
+ assert(rowMat.numRows() === 3, "should drop empty rows")
+ assert(rowMat.rows.collect().toSeq === data.map(_.vector).toSeq)
+ }
+
+ test("multiply a local matrix") {
+ val A = new IndexedRowMatrix(indexedRows)
+ val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+ val C = A.multiply(B)
+ val localA = A.toBreeze()
+ val localC = C.toBreeze()
+ val expected = localA * B.toBreeze.asInstanceOf[BDM[Double]]
+ assert(localC === expected)
+ }
+
+ test("gram") {
+ val A = new IndexedRowMatrix(indexedRows)
+ val G = A.computeGramianMatrix()
+ val expected = BDM(
+ (90.0, 12.0, 24.0),
+ (12.0, 17.0, 22.0),
+ (24.0, 22.0, 30.0))
+ assert(G.toBreeze === expected)
+ }
+
+ test("svd") {
+ val A = new IndexedRowMatrix(indexedRows)
+ val svd = A.computeSVD(n, computeU = true)
+ assert(svd.U.isInstanceOf[IndexedRowMatrix])
+ val localA = A.toBreeze()
+ val U = svd.U.toBreeze()
+ val s = svd.s.toBreeze.asInstanceOf[BDV[Double]]
+ val V = svd.V.toBreeze.asInstanceOf[BDM[Double]]
+ assert(closeToZero(U.t * U - BDM.eye[Double](n)))
+ assert(closeToZero(V.t * V - BDM.eye[Double](n)))
+ assert(closeToZero(U * brzDiag(s) * V.t - localA))
+ }
+
+ def closeToZero(G: BDM[Double]): Boolean = {
+ G.valuesIterator.map(math.abs).sum < 1e-6
+ }
+}
+
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
new file mode 100644
index 0000000000..71ee8e8a4f
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, norm => brzNorm, svd => brzSvd}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.{Matrices, Vectors, Vector}
+
+class RowMatrixSuite extends FunSuite with LocalSparkContext {
+
+ val m = 4
+ val n = 3
+ val arr = Array(0.0, 3.0, 6.0, 9.0, 1.0, 4.0, 7.0, 0.0, 2.0, 5.0, 8.0, 1.0)
+ val denseData = Seq(
+ Vectors.dense(0.0, 1.0, 2.0),
+ Vectors.dense(3.0, 4.0, 5.0),
+ Vectors.dense(6.0, 7.0, 8.0),
+ Vectors.dense(9.0, 0.0, 1.0)
+ )
+ val sparseData = Seq(
+ Vectors.sparse(3, Seq((1, 1.0), (2, 2.0))),
+ Vectors.sparse(3, Seq((0, 3.0), (1, 4.0), (2, 5.0))),
+ Vectors.sparse(3, Seq((0, 6.0), (1, 7.0), (2, 8.0))),
+ Vectors.sparse(3, Seq((0, 9.0), (2, 1.0)))
+ )
+
+ val principalComponents = BDM(
+ (0.0, 1.0, 0.0),
+ (math.sqrt(2.0) / 2.0, 0.0, math.sqrt(2.0) / 2.0),
+ (math.sqrt(2.0) / 2.0, 0.0, - math.sqrt(2.0) / 2.0))
+
+ var denseMat: RowMatrix = _
+ var sparseMat: RowMatrix = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ denseMat = new RowMatrix(sc.parallelize(denseData, 2))
+ sparseMat = new RowMatrix(sc.parallelize(sparseData, 2))
+ }
+
+ test("size") {
+ assert(denseMat.numRows() === m)
+ assert(denseMat.numCols() === n)
+ assert(sparseMat.numRows() === m)
+ assert(sparseMat.numCols() === n)
+ }
+
+ test("empty rows") {
+ val rows = sc.parallelize(Seq[Vector](), 1)
+ val emptyMat = new RowMatrix(rows)
+ intercept[RuntimeException] {
+ emptyMat.numCols()
+ }
+ intercept[RuntimeException] {
+ emptyMat.numRows()
+ }
+ }
+
+ test("toBreeze") {
+ val expected = BDM(
+ (0.0, 1.0, 2.0),
+ (3.0, 4.0, 5.0),
+ (6.0, 7.0, 8.0),
+ (9.0, 0.0, 1.0))
+ for (mat <- Seq(denseMat, sparseMat)) {
+ assert(mat.toBreeze() === expected)
+ }
+ }
+
+ test("gram") {
+ val expected =
+ Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0))
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val G = mat.computeGramianMatrix()
+ assert(G.toBreeze === expected.toBreeze)
+ }
+ }
+
+ test("svd of a full-rank matrix") {
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val localMat = mat.toBreeze()
+ val (localU, localSigma, localVt) = brzSvd(localMat)
+ val localV: BDM[Double] = localVt.t.toDenseMatrix
+ for (k <- 1 to n) {
+ val svd = mat.computeSVD(k, computeU = true)
+ val U = svd.U
+ val s = svd.s
+ val V = svd.V
+ assert(U.numRows() === m)
+ assert(U.numCols() === k)
+ assert(s.size === k)
+ assert(V.numRows === n)
+ assert(V.numCols === k)
+ assertColumnEqualUpToSign(U.toBreeze(), localU, k)
+ assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
+ assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+ }
+ val svdWithoutU = mat.computeSVD(n)
+ assert(svdWithoutU.U === null)
+ }
+ }
+
+ test("svd of a low-rank matrix") {
+ val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2)
+ val mat = new RowMatrix(rows, 4, 2)
+ val svd = mat.computeSVD(2, computeU = true)
+ assert(svd.s.size === 1, "should not return zero singular values")
+ assert(svd.U.numRows() === 4)
+ assert(svd.U.numCols() === 1)
+ assert(svd.V.numRows === 2)
+ assert(svd.V.numCols === 1)
+ }
+
+ def closeToZero(G: BDM[Double]): Boolean = {
+ G.valuesIterator.map(math.abs).sum < 1e-6
+ }
+
+ def closeToZero(v: BDV[Double]): Boolean = {
+ brzNorm(v, 1.0) < 1e-6
+ }
+
+ def assertColumnEqualUpToSign(A: BDM[Double], B: BDM[Double], k: Int) {
+ assert(A.rows === B.rows)
+ for (j <- 0 until k) {
+ val aj = A(::, j)
+ val bj = B(::, j)
+ assert(closeToZero(aj - bj) || closeToZero(aj + bj),
+ s"The $j-th columns mismatch: $aj and $bj")
+ }
+ }
+
+ test("pca") {
+ for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) {
+ val pc = denseMat.computePrincipalComponents(k)
+ assert(pc.numRows === n)
+ assert(pc.numCols === k)
+ assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k)
+ }
+ }
+
+ test("multiply a local matrix") {
+ val B = Matrices.dense(n, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val AB = mat.multiply(B)
+ assert(AB.numRows() === m)
+ assert(AB.numCols() === 2)
+ assert(AB.rows.collect().toSeq === Seq(
+ Vectors.dense(5.0, 14.0),
+ Vectors.dense(14.0, 50.0),
+ Vectors.dense(23.0, 86.0),
+ Vectors.dense(2.0, 32.0)
+ ))
+ }
+ }
+}