diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2015-01-31 00:47:30 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-01-31 00:47:30 -0800 |
commit | ef8974b1b7ff177d9636d091770dff64fedc385f (patch) | |
tree | 38ce3d36dbb724fd812c30d49ab3be7467a9739d /mllib/src/test | |
parent | 34250a613cee39b03f05f2d54ae37029abd8a502 (diff) | |
download | spark-ef8974b1b7ff177d9636d091770dff64fedc385f.tar.gz spark-ef8974b1b7ff177d9636d091770dff64fedc385f.tar.bz2 spark-ef8974b1b7ff177d9636d091770dff64fedc385f.zip |
[SPARK-3975] Added support for BlockMatrix addition and multiplication
Support for multiplying and adding large distributed matrices!
Author: Burak Yavuz <brkyvz@gmail.com>
Author: Burak Yavuz <brkyvz@dn51t42l.sunet>
Author: Burak Yavuz <brkyvz@dn51t4rd.sunet>
Author: Burak Yavuz <brkyvz@dn0a221430.sunet>
Author: Burak Yavuz <brkyvz@dn0a22b17d.sunet>
Closes #4274 from brkyvz/SPARK-3975PR2 and squashes the following commits:
17abd59 [Burak Yavuz] added indices to error message
ac25783 [Burak Yavuz] merged masyer
b66fd8b [Burak Yavuz] merged masyer
e39baff [Burak Yavuz] addressed code review v1
2dba642 [Burak Yavuz] [SPARK-3975] Added support for BlockMatrix addition and multiplication
fb7624b [Burak Yavuz] merged master
98c58ea [Burak Yavuz] added tests
cdeb5df [Burak Yavuz] before adding tests
c9bf247 [Burak Yavuz] fixed merge conflicts
1cb0d06 [Burak Yavuz] [SPARK-3976] Added doc
f92a916 [Burak Yavuz] merge upstream
1a63b20 [Burak Yavuz] [SPARK-3974] Remove setPartition method. Isn't required
1e8bb2a [Burak Yavuz] [SPARK-3974] Change return type of cache and persist
e3d24c3 [Burak Yavuz] [SPARK-3976] Pulled upstream changes
fa3774f [Burak Yavuz] [SPARK-3976] updated matrix multiplication and addition implementation
239ab4b [Burak Yavuz] [SPARK-3974] Addressed @jkbradley's comments
add7b05 [Burak Yavuz] [SPARK-3976] Updated code according to upstream changes
e29acfd [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-3976
3127233 [Burak Yavuz] fixed merge conflicts with upstream
ba414d2 [Burak Yavuz] [SPARK-3974] fixed frobenius norm
ab6cde0 [Burak Yavuz] [SPARK-3974] Modifications cleaning code up, making size calculation more robust
9ae85aa [Burak Yavuz] [SPARK-3974] Made partitioner a variable inside BlockMatrix instead of a constructor variable
d033861 [Burak Yavuz] [SPARK-3974] Removed SubMatrixInfo and added constructor without partitioner
8e954ab [Burak Yavuz] save changes
bbeae8c [Burak Yavuz] merged master
987ea53 [Burak Yavuz] merged master
49b9586 [Burak Yavuz] [SPARK-3974] Updated testing utils from master
645afbe [Burak Yavuz] [SPARK-3974] Pull latest master
beb1edd [Burak Yavuz] merge conflicts fixed
f41d8db [Burak Yavuz] update tests
b05aabb [Burak Yavuz] [SPARK-3974] Updated tests to reflect changes
56b0546 [Burak Yavuz] updates from 3974 PR
b7b8a8f [Burak Yavuz] pull updates from master
b2dec63 [Burak Yavuz] Pull changes from 3974
19c17e8 [Burak Yavuz] [SPARK-3974] Changed blockIdRow and blockIdCol
5f062e6 [Burak Yavuz] updates with 3974
6729fbd [Burak Yavuz] Updated with respect to SPARK-3974 PR
589fbb6 [Burak Yavuz] [SPARK-3974] Code review feedback addressed
63a4858 [Burak Yavuz] added grid multiplication
aa8f086 [Burak Yavuz] [SPARK-3974] Additional comments added
7381b99 [Burak Yavuz] merge with PR1
f378e16 [Burak Yavuz] [SPARK-3974] Block Matrix Abstractions ready
b693209 [Burak Yavuz] Ready for Pull request
Diffstat (limited to 'mllib/src/test')
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala | 102 |
1 files changed, 90 insertions, 12 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 461f1f92df..949d1c9939 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -17,14 +17,15 @@ package org.apache.spark.mllib.linalg.distributed -import scala.util.Random +import java.{util => ju} import breeze.linalg.{DenseMatrix => BDM} import org.scalatest.FunSuite import org.apache.spark.SparkException -import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} +import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrices, Matrix} import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { @@ -37,7 +38,6 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { override def beforeAll() { super.beforeAll() - val blocks: Seq[((Int, Int), Matrix)] = Seq( ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), @@ -54,7 +54,7 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { } test("grid partitioner") { - val random = new Random() + val random = new ju.Random() // This should generate a 4x4 grid of 1x2 blocks. val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12) val expected0 = Array( @@ -148,6 +148,92 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(gridBasedMat.toBreeze() === expected) } + test("add") { + val blocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), + ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), // Added block that doesn't exist in A + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) + val rdd = sc.parallelize(blocks, numPartitions) + val B = new BlockMatrix(rdd, rowPerPart, colPerPart) + + val expected = BDM( + (2.0, 0.0, 0.0, 0.0), + (0.0, 4.0, 2.0, 0.0), + (6.0, 2.0, 2.0, 0.0), + (0.0, 2.0, 4.0, 2.0), + (1.0, 0.0, 2.0, 10.0)) + + val AplusB = gridBasedMat.add(B) + assert(AplusB.numRows() === m) + assert(AplusB.numCols() === B.numCols()) + assert(AplusB.toBreeze() === expected) + + val C = new BlockMatrix(rdd, rowPerPart, colPerPart, m, n + 1) // columns don't match + intercept[IllegalArgumentException] { + gridBasedMat.add(C) + } + val largerBlocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(4, 4, new Array[Double](16))), + ((1, 0), new DenseMatrix(1, 4, Array(1.0, 0.0, 1.0, 5.0)))) + val C2 = new BlockMatrix(sc.parallelize(largerBlocks, numPartitions), 4, 4, m, n) + intercept[SparkException] { // partitioning doesn't match + gridBasedMat.add(C2) + } + // adding BlockMatrices composed of SparseMatrices + val sparseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), SparseMatrix.speye(4)) + val denseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), DenseMatrix.eye(4)) + val sparseBM = new BlockMatrix(sc.makeRDD(sparseBlocks, 4), 4, 4, 8, 8) + val denseBM = new BlockMatrix(sc.makeRDD(denseBlocks, 4), 4, 4, 8, 8) + + assert(sparseBM.add(sparseBM).toBreeze() === sparseBM.add(denseBM).toBreeze()) + } + + test("multiply") { + // identity matrix + val blocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0)))) + val rdd = sc.parallelize(blocks, 2) + val B = new BlockMatrix(rdd, colPerPart, rowPerPart) + val expected = BDM( + (1.0, 0.0, 0.0, 0.0), + (0.0, 2.0, 1.0, 0.0), + (3.0, 1.0, 1.0, 0.0), + (0.0, 1.0, 2.0, 1.0), + (0.0, 0.0, 1.0, 5.0)) + + val AtimesB = gridBasedMat.multiply(B) + assert(AtimesB.numRows() === m) + assert(AtimesB.numCols() === n) + assert(AtimesB.toBreeze() === expected) + val C = new BlockMatrix(rdd, rowPerPart, colPerPart, m + 1, n) // dimensions don't match + intercept[IllegalArgumentException] { + gridBasedMat.multiply(C) + } + val largerBlocks = Seq(((0, 0), DenseMatrix.eye(4))) + val C2 = new BlockMatrix(sc.parallelize(largerBlocks, numPartitions), 4, 4) + intercept[SparkException] { + // partitioning doesn't match + gridBasedMat.multiply(C2) + } + val rand = new ju.Random(42) + val largerAblocks = for (i <- 0 until 20) yield ((i % 5, i / 5), DenseMatrix.rand(6, 4, rand)) + val largerBblocks = for (i <- 0 until 16) yield ((i % 4, i / 4), DenseMatrix.rand(4, 4, rand)) + + // Try it with increased number of partitions + val largeA = new BlockMatrix(sc.parallelize(largerAblocks, 10), 6, 4) + val largeB = new BlockMatrix(sc.parallelize(largerBblocks, 8), 4, 4) + val largeC = largeA.multiply(largeB) + val localC = largeC.toLocalMatrix() + val result = largeA.toLocalMatrix().multiply(largeB.toLocalMatrix().asInstanceOf[DenseMatrix]) + assert(largeC.numRows() === largeA.numRows()) + assert(largeC.numCols() === largeB.numCols()) + assert(localC ~== result absTol 1e-8) + } + test("validate") { // No error gridBasedMat.validate() @@ -201,14 +287,6 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(AT.numCols() === gridBasedMat.numRows()) assert(AT.toBreeze() === expected) - // partitioner must update as well - val originalPartitioner = gridBasedMat.partitioner - val ATpartitioner = AT.partitioner - assert(originalPartitioner.colsPerPart === ATpartitioner.rowsPerPart) - assert(originalPartitioner.rowsPerPart === ATpartitioner.colsPerPart) - assert(originalPartitioner.cols === ATpartitioner.rows) - assert(originalPartitioner.rows === ATpartitioner.cols) - // make sure it works when matrices are cached as well gridBasedMat.cache() val AT2 = gridBasedMat.transpose |