aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test/scala
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-04-08 23:01:15 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-08 23:01:15 -0700
commit9689b663a2a4947ad60795321c770052f3c637f1 (patch)
treef2647f7b1ae3a3d11d3ecb29e764214b7cb589ca /mllib/src/test/scala
parentfa0524fd02eedd0bbf1edc750dc3997a86ea25f5 (diff)
downloadspark-9689b663a2a4947ad60795321c770052f3c637f1.tar.gz
spark-9689b663a2a4947ad60795321c770052f3c637f1.tar.bz2
spark-9689b663a2a4947ad60795321c770052f3c637f1.zip
[SPARK-1390] Refactoring of matrices backed by RDDs
This is to refactor interfaces for matrices backed by RDDs. It would be better if we have a clear separation of local matrices and those backed by RDDs. Right now, we have 1. `org.apache.spark.mllib.linalg.SparseMatrix`, which is a wrapper over an RDD of matrix entries, i.e., coordinate list format. 2. `org.apache.spark.mllib.linalg.TallSkinnyDenseMatrix`, which is a wrapper over RDD[Array[Double]], i.e. row-oriented format. We will see naming collision when we introduce local `SparseMatrix`, and the name `TallSkinnyDenseMatrix` is not exact if we switch to `RDD[Vector]` from `RDD[Array[Double]]`. It would be better to have "RDD" in the class name to suggest that operations may trigger jobs. The proposed names are (all under `org.apache.spark.mllib.linalg.rdd`): 1. `RDDMatrix`: trait for matrices backed by one or more RDDs 2. `CoordinateRDDMatrix`: wrapper of `RDD[(Long, Long, Double)]` 3. `RowRDDMatrix`: wrapper of `RDD[Vector]` whose rows do not have special ordering 4. `IndexedRowRDDMatrix`: wrapper of `RDD[(Long, Vector)]` whose rows are associated with indices The current code also introduces local matrices. Author: Xiangrui Meng <meng@databricks.com> Closes #296 from mengxr/mat and squashes the following commits: 24d8294 [Xiangrui Meng] fix for groupBy returning Iterable bfc2b26 [Xiangrui Meng] merge master 8e4f1f5 [Xiangrui Meng] Merge branch 'master' into mat 0135193 [Xiangrui Meng] address Reza's comments 03cd7e1 [Xiangrui Meng] add pca/gram to IndexedRowMatrix add toBreeze to DistributedMatrix for test simplify tests b177ff1 [Xiangrui Meng] address Matei's comments be119fe [Xiangrui Meng] rename m/n to numRows/numCols for local matrix add tests for matrices b881506 [Xiangrui Meng] rename SparkPCA/SVD to TallSkinnyPCA/SVD e7d0d4a [Xiangrui Meng] move IndexedRDDMatrixRow to IndexedRowRDDMatrix 0d1491c [Xiangrui Meng] fix test errors a85262a [Xiangrui Meng] rename RDDMatrixRow to IndexedRDDMatrixRow b8b6ac3 [Xiangrui Meng] Remove old code 4cf679c [Xiangrui Meng] port pca to RowRDDMatrix, and add multiply and covariance 7836e2f [Xiangrui Meng] initial refactoring of matrices backed by RDDs
Diffstat (limited to 'mllib/src/test/scala')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala40
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala39
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala124
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala194
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala98
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala120
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala173
7 files changed, 470 insertions, 318 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala
new file mode 100644
index 0000000000..82d49c76ed
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+class BreezeMatrixConversionSuite extends FunSuite {
+ test("dense matrix to breeze") {
+ val mat = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+ val breeze = mat.toBreeze.asInstanceOf[BDM[Double]]
+ assert(breeze.rows === mat.numRows)
+ assert(breeze.cols === mat.numCols)
+ assert(breeze.data.eq(mat.asInstanceOf[DenseMatrix].values), "should not copy data")
+ }
+
+ test("dense breeze matrix to matrix") {
+ val breeze = new BDM[Double](3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+ val mat = Matrices.fromBreeze(breeze).asInstanceOf[DenseMatrix]
+ assert(mat.numRows === breeze.rows)
+ assert(mat.numCols === breeze.cols)
+ assert(mat.values.eq(breeze.data), "should not copy data")
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
new file mode 100644
index 0000000000..9c66b4db9f
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.scalatest.FunSuite
+
+class MatricesSuite extends FunSuite {
+ test("dense matrix construction") {
+ val m = 3
+ val n = 2
+ val values = Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)
+ val mat = Matrices.dense(m, n, values).asInstanceOf[DenseMatrix]
+ assert(mat.numRows === m)
+ assert(mat.numCols === n)
+ assert(mat.values.eq(values), "should not copy data")
+ assert(mat.toArray.eq(values), "toArray should not copy data")
+ }
+
+ test("dense matrix construction with wrong dimension") {
+ intercept[RuntimeException] {
+ Matrices.dense(3, 2, Array(0.0, 1.0, 2.0))
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
deleted file mode 100644
index 5e5086b1bf..0000000000
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import scala.util.Random
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.mllib.util._
-
-import org.jblas._
-
-class PCASuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
-
- val EPSILON = 1e-3
-
- // Return jblas matrix from sparse matrix RDD
- def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
- val data = matrix.data
- val ret = DoubleMatrix.zeros(matrix.m, matrix.n)
- matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval))
- ret
- }
-
- def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) {
- assert(a.rows == b.rows && a.columns == b.columns,
- "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns")
- for (i <- 0 until a.columns) {
- val aCol = a.getColumn(i)
- val bCol = b.getColumn(i)
- val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1)
- assert(diff < EPSILON, "matrix mismatch: " + diff)
- }
- }
-
- test("full rank matrix pca") {
- val m = 5
- val n = 3
- val dataArr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten
- val data = sc.makeRDD(dataArr, 3)
- val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
- val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054),
- (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474),
- (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886))
- val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)), 3)
-
- val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))
-
- assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
- }
-
- test("sparse matrix full rank matrix pca") {
- val m = 5
- val n = 3
- // the entry that gets dropped is zero to test sparse support
- val dataArr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten.drop(1)
- val data = sc.makeRDD(dataArr, 3)
- val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
- val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602), (0,2,0.7054),
- (1,0,-0.1448), (1,1,0.7483), (1,2,0.6474),
- (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886))
- val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))
-
- val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))
-
- assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
- }
-
- test("truncated matrix pca") {
- val m = 5
- val n = 3
- val dataArr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, Math.sin(a + b + a * b)) }.flatten
-
- val data = sc.makeRDD(dataArr, 3)
- val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
- val realPCAArray = Array((0,0,-0.2579), (0,1,-0.6602),
- (1,0,-0.1448), (1,1,0.7483),
- (2,0,0.9553), (2,1,-0.0649))
- val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))
-
- val k = 2
- val coeffs = new DoubleMatrix(new PCA().setK(k).compute(a))
-
- assertMatrixApproximatelyEquals(getDenseMatrix(SparseMatrix(realPCA,n,k)), coeffs)
- }
-}
-
-
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
deleted file mode 100644
index 20e2b0f84b..0000000000
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg
-
-import scala.util.Random
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-
-import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import org.apache.spark.mllib.util._
-
-import org.jblas._
-
-class SVDSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
-
- val EPSILON = 1e-4
-
- // Return jblas matrix from sparse matrix RDD
- def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
- val data = matrix.data
- val m = matrix.m
- val n = matrix.n
- val ret = DoubleMatrix.zeros(m, n)
- matrix.data.collect().map(x => ret.put(x.i, x.j, x.mval))
- ret
- }
-
- def assertMatrixApproximatelyEquals(a: DoubleMatrix, b: DoubleMatrix) {
- assert(a.rows == b.rows && a.columns == b.columns,
- "dimension mismatch: $a.rows vs $b.rows and $a.columns vs $b.columns")
- for (i <- 0 until a.columns) {
- val aCol = a.getColumn(i)
- val bCol = b.getColumn(i)
- val diff = Math.min(aCol.sub(bCol).norm1, aCol.add(bCol).norm1)
- assert(diff < EPSILON, "matrix mismatch: " + diff)
- }
- }
-
- test("full rank matrix svd") {
- val m = 10
- val n = 3
- val datarr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten
- val data = sc.makeRDD(datarr, 3)
-
- val a = SparseMatrix(data, m, n)
-
- val decomposed = new SVD().setK(n).compute(a)
- val u = decomposed.U
- val v = decomposed.V
- val s = decomposed.S
-
- val denseA = getDenseMatrix(a)
- val svd = Singular.sparseSVD(denseA)
-
- val retu = getDenseMatrix(u)
- val rets = getDenseMatrix(s)
- val retv = getDenseMatrix(v)
-
-
- // check individual decomposition
- assertMatrixApproximatelyEquals(retu, svd(0))
- assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1)))
- assertMatrixApproximatelyEquals(retv, svd(2))
-
- // check multiplication guarantee
- assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
- }
-
- test("dense full rank matrix svd") {
- val m = 10
- val n = 3
- val datarr = Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten
- val data = sc.makeRDD(datarr, 3)
-
- val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n))
-
- val decomposed = new SVD().setK(n).setComputeU(true).compute(a)
- val u = LAUtils.denseToSparse(decomposed.U)
- val v = decomposed.V
- val s = decomposed.S
-
- val denseA = getDenseMatrix(LAUtils.denseToSparse(a))
- val svd = Singular.sparseSVD(denseA)
-
- val retu = getDenseMatrix(u)
- val rets = DoubleMatrix.diag(new DoubleMatrix(s))
- val retv = new DoubleMatrix(v)
-
-
- // check individual decomposition
- assertMatrixApproximatelyEquals(retu, svd(0))
- assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1)))
- assertMatrixApproximatelyEquals(retv, svd(2))
-
- // check multiplication guarantee
- assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
- }
-
- test("rank one matrix svd") {
- val m = 10
- val n = 3
- val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) =>
- MatrixEntry(a, b, 1.0) }.flatten )
- val k = 1
-
- val a = SparseMatrix(data, m, n)
-
- val decomposed = new SVD().setK(k).compute(a)
- val u = decomposed.U
- val s = decomposed.S
- val v = decomposed.V
- val retrank = s.data.collect().length
-
- assert(retrank == 1, "rank returned not one")
-
- val denseA = getDenseMatrix(a)
- val svd = Singular.sparseSVD(denseA)
-
- val retu = getDenseMatrix(u)
- val rets = getDenseMatrix(s)
- val retv = getDenseMatrix(v)
-
- // check individual decomposition
- assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0))
- assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
- assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0))
-
- // check multiplication guarantee
- assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA)
- }
-
- test("truncated with k") {
- val m = 10
- val n = 3
- val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
- MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten )
- val a = SparseMatrix(data, m, n)
-
- val k = 1 // only one svalue above this
-
- val decomposed = new SVD().setK(k).compute(a)
- val u = decomposed.U
- val s = decomposed.S
- val v = decomposed.V
- val retrank = s.data.collect().length
-
- val denseA = getDenseMatrix(a)
- val svd = Singular.sparseSVD(denseA)
-
- val retu = getDenseMatrix(u)
- val rets = getDenseMatrix(s)
- val retv = getDenseMatrix(v)
-
- assert(retrank == 1, "rank returned not one")
-
- // check individual decomposition
- assertMatrixApproximatelyEquals(retu, svd(0).getColumn(0))
- assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
- assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0))
- }
-}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
new file mode 100644
index 0000000000..cd45438fb6
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
+
+class CoordinateMatrixSuite extends FunSuite with LocalSparkContext {
+
+ val m = 5
+ val n = 4
+ var mat: CoordinateMatrix = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ val entries = sc.parallelize(Seq(
+ (0, 0, 1.0),
+ (0, 1, 2.0),
+ (1, 1, 3.0),
+ (1, 2, 4.0),
+ (2, 2, 5.0),
+ (2, 3, 6.0),
+ (3, 0, 7.0),
+ (3, 3, 8.0),
+ (4, 1, 9.0)), 3).map { case (i, j, value) =>
+ MatrixEntry(i, j, value)
+ }
+ mat = new CoordinateMatrix(entries)
+ }
+
+ test("size") {
+ assert(mat.numRows() === m)
+ assert(mat.numCols() === n)
+ }
+
+ test("empty entries") {
+ val entries = sc.parallelize(Seq[MatrixEntry](), 1)
+ val emptyMat = new CoordinateMatrix(entries)
+ intercept[RuntimeException] {
+ emptyMat.numCols()
+ }
+ intercept[RuntimeException] {
+ emptyMat.numRows()
+ }
+ }
+
+ test("toBreeze") {
+ val expected = BDM(
+ (1.0, 2.0, 0.0, 0.0),
+ (0.0, 3.0, 4.0, 0.0),
+ (0.0, 0.0, 5.0, 6.0),
+ (7.0, 0.0, 0.0, 8.0),
+ (0.0, 9.0, 0.0, 0.0))
+ assert(mat.toBreeze() === expected)
+ }
+
+ test("toIndexedRowMatrix") {
+ val indexedRowMatrix = mat.toIndexedRowMatrix()
+ val expected = BDM(
+ (1.0, 2.0, 0.0, 0.0),
+ (0.0, 3.0, 4.0, 0.0),
+ (0.0, 0.0, 5.0, 6.0),
+ (7.0, 0.0, 0.0, 8.0),
+ (0.0, 9.0, 0.0, 0.0))
+ assert(indexedRowMatrix.toBreeze() === expected)
+ }
+
+ test("toRowMatrix") {
+ val rowMatrix = mat.toRowMatrix()
+ val rows = rowMatrix.rows.collect().toSet
+ val expected = Set(
+ Vectors.dense(1.0, 2.0, 0.0, 0.0),
+ Vectors.dense(0.0, 3.0, 4.0, 0.0),
+ Vectors.dense(0.0, 0.0, 5.0, 6.0),
+ Vectors.dense(7.0, 0.0, 0.0, 8.0),
+ Vectors.dense(0.0, 9.0, 0.0, 0.0))
+ assert(rows === expected)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
new file mode 100644
index 0000000000..f7c46f23b7
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.{Matrices, Vectors}
+
+class IndexedRowMatrixSuite extends FunSuite with LocalSparkContext {
+
+ val m = 4
+ val n = 3
+ val data = Seq(
+ (0L, Vectors.dense(0.0, 1.0, 2.0)),
+ (1L, Vectors.dense(3.0, 4.0, 5.0)),
+ (3L, Vectors.dense(9.0, 0.0, 1.0))
+ ).map(x => IndexedRow(x._1, x._2))
+ var indexedRows: RDD[IndexedRow] = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ indexedRows = sc.parallelize(data, 2)
+ }
+
+ test("size") {
+ val mat1 = new IndexedRowMatrix(indexedRows)
+ assert(mat1.numRows() === m)
+ assert(mat1.numCols() === n)
+
+ val mat2 = new IndexedRowMatrix(indexedRows, 5, 0)
+ assert(mat2.numRows() === 5)
+ assert(mat2.numCols() === n)
+ }
+
+ test("empty rows") {
+ val rows = sc.parallelize(Seq[IndexedRow](), 1)
+ val mat = new IndexedRowMatrix(rows)
+ intercept[RuntimeException] {
+ mat.numRows()
+ }
+ intercept[RuntimeException] {
+ mat.numCols()
+ }
+ }
+
+ test("toBreeze") {
+ val mat = new IndexedRowMatrix(indexedRows)
+ val expected = BDM(
+ (0.0, 1.0, 2.0),
+ (3.0, 4.0, 5.0),
+ (0.0, 0.0, 0.0),
+ (9.0, 0.0, 1.0))
+ assert(mat.toBreeze() === expected)
+ }
+
+ test("toRowMatrix") {
+ val idxRowMat = new IndexedRowMatrix(indexedRows)
+ val rowMat = idxRowMat.toRowMatrix()
+ assert(rowMat.numCols() === n)
+ assert(rowMat.numRows() === 3, "should drop empty rows")
+ assert(rowMat.rows.collect().toSeq === data.map(_.vector).toSeq)
+ }
+
+ test("multiply a local matrix") {
+ val A = new IndexedRowMatrix(indexedRows)
+ val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+ val C = A.multiply(B)
+ val localA = A.toBreeze()
+ val localC = C.toBreeze()
+ val expected = localA * B.toBreeze.asInstanceOf[BDM[Double]]
+ assert(localC === expected)
+ }
+
+ test("gram") {
+ val A = new IndexedRowMatrix(indexedRows)
+ val G = A.computeGramianMatrix()
+ val expected = BDM(
+ (90.0, 12.0, 24.0),
+ (12.0, 17.0, 22.0),
+ (24.0, 22.0, 30.0))
+ assert(G.toBreeze === expected)
+ }
+
+ test("svd") {
+ val A = new IndexedRowMatrix(indexedRows)
+ val svd = A.computeSVD(n, computeU = true)
+ assert(svd.U.isInstanceOf[IndexedRowMatrix])
+ val localA = A.toBreeze()
+ val U = svd.U.toBreeze()
+ val s = svd.s.toBreeze.asInstanceOf[BDV[Double]]
+ val V = svd.V.toBreeze.asInstanceOf[BDM[Double]]
+ assert(closeToZero(U.t * U - BDM.eye[Double](n)))
+ assert(closeToZero(V.t * V - BDM.eye[Double](n)))
+ assert(closeToZero(U * brzDiag(s) * V.t - localA))
+ }
+
+ def closeToZero(G: BDM[Double]): Boolean = {
+ G.valuesIterator.map(math.abs).sum < 1e-6
+ }
+}
+
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
new file mode 100644
index 0000000000..71ee8e8a4f
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, norm => brzNorm, svd => brzSvd}
+
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.{Matrices, Vectors, Vector}
+
+class RowMatrixSuite extends FunSuite with LocalSparkContext {
+
+ val m = 4
+ val n = 3
+ val arr = Array(0.0, 3.0, 6.0, 9.0, 1.0, 4.0, 7.0, 0.0, 2.0, 5.0, 8.0, 1.0)
+ val denseData = Seq(
+ Vectors.dense(0.0, 1.0, 2.0),
+ Vectors.dense(3.0, 4.0, 5.0),
+ Vectors.dense(6.0, 7.0, 8.0),
+ Vectors.dense(9.0, 0.0, 1.0)
+ )
+ val sparseData = Seq(
+ Vectors.sparse(3, Seq((1, 1.0), (2, 2.0))),
+ Vectors.sparse(3, Seq((0, 3.0), (1, 4.0), (2, 5.0))),
+ Vectors.sparse(3, Seq((0, 6.0), (1, 7.0), (2, 8.0))),
+ Vectors.sparse(3, Seq((0, 9.0), (2, 1.0)))
+ )
+
+ val principalComponents = BDM(
+ (0.0, 1.0, 0.0),
+ (math.sqrt(2.0) / 2.0, 0.0, math.sqrt(2.0) / 2.0),
+ (math.sqrt(2.0) / 2.0, 0.0, - math.sqrt(2.0) / 2.0))
+
+ var denseMat: RowMatrix = _
+ var sparseMat: RowMatrix = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ denseMat = new RowMatrix(sc.parallelize(denseData, 2))
+ sparseMat = new RowMatrix(sc.parallelize(sparseData, 2))
+ }
+
+ test("size") {
+ assert(denseMat.numRows() === m)
+ assert(denseMat.numCols() === n)
+ assert(sparseMat.numRows() === m)
+ assert(sparseMat.numCols() === n)
+ }
+
+ test("empty rows") {
+ val rows = sc.parallelize(Seq[Vector](), 1)
+ val emptyMat = new RowMatrix(rows)
+ intercept[RuntimeException] {
+ emptyMat.numCols()
+ }
+ intercept[RuntimeException] {
+ emptyMat.numRows()
+ }
+ }
+
+ test("toBreeze") {
+ val expected = BDM(
+ (0.0, 1.0, 2.0),
+ (3.0, 4.0, 5.0),
+ (6.0, 7.0, 8.0),
+ (9.0, 0.0, 1.0))
+ for (mat <- Seq(denseMat, sparseMat)) {
+ assert(mat.toBreeze() === expected)
+ }
+ }
+
+ test("gram") {
+ val expected =
+ Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0))
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val G = mat.computeGramianMatrix()
+ assert(G.toBreeze === expected.toBreeze)
+ }
+ }
+
+ test("svd of a full-rank matrix") {
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val localMat = mat.toBreeze()
+ val (localU, localSigma, localVt) = brzSvd(localMat)
+ val localV: BDM[Double] = localVt.t.toDenseMatrix
+ for (k <- 1 to n) {
+ val svd = mat.computeSVD(k, computeU = true)
+ val U = svd.U
+ val s = svd.s
+ val V = svd.V
+ assert(U.numRows() === m)
+ assert(U.numCols() === k)
+ assert(s.size === k)
+ assert(V.numRows === n)
+ assert(V.numCols === k)
+ assertColumnEqualUpToSign(U.toBreeze(), localU, k)
+ assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
+ assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+ }
+ val svdWithoutU = mat.computeSVD(n)
+ assert(svdWithoutU.U === null)
+ }
+ }
+
+ test("svd of a low-rank matrix") {
+ val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2)
+ val mat = new RowMatrix(rows, 4, 2)
+ val svd = mat.computeSVD(2, computeU = true)
+ assert(svd.s.size === 1, "should not return zero singular values")
+ assert(svd.U.numRows() === 4)
+ assert(svd.U.numCols() === 1)
+ assert(svd.V.numRows === 2)
+ assert(svd.V.numCols === 1)
+ }
+
+ def closeToZero(G: BDM[Double]): Boolean = {
+ G.valuesIterator.map(math.abs).sum < 1e-6
+ }
+
+ def closeToZero(v: BDV[Double]): Boolean = {
+ brzNorm(v, 1.0) < 1e-6
+ }
+
+ def assertColumnEqualUpToSign(A: BDM[Double], B: BDM[Double], k: Int) {
+ assert(A.rows === B.rows)
+ for (j <- 0 until k) {
+ val aj = A(::, j)
+ val bj = B(::, j)
+ assert(closeToZero(aj - bj) || closeToZero(aj + bj),
+ s"The $j-th columns mismatch: $aj and $bj")
+ }
+ }
+
+ test("pca") {
+ for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) {
+ val pc = denseMat.computePrincipalComponents(k)
+ assert(pc.numRows === n)
+ assert(pc.numCols === k)
+ assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k)
+ }
+ }
+
+ test("multiply a local matrix") {
+ val B = Matrices.dense(n, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val AB = mat.multiply(B)
+ assert(AB.numRows() === m)
+ assert(AB.numCols() === 2)
+ assert(AB.rows.collect().toSeq === Seq(
+ Vectors.dense(5.0, 14.0),
+ Vectors.dense(14.0, 50.0),
+ Vectors.dense(23.0, 86.0),
+ Vectors.dense(2.0, 32.0)
+ ))
+ }
+ }
+}