aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-01-31 00:47:30 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-31 00:47:30 -0800
commitef8974b1b7ff177d9636d091770dff64fedc385f (patch)
tree38ce3d36dbb724fd812c30d49ab3be7467a9739d /mllib/src/test
parent34250a613cee39b03f05f2d54ae37029abd8a502 (diff)
downloadspark-ef8974b1b7ff177d9636d091770dff64fedc385f.tar.gz
spark-ef8974b1b7ff177d9636d091770dff64fedc385f.tar.bz2
spark-ef8974b1b7ff177d9636d091770dff64fedc385f.zip
[SPARK-3975] Added support for BlockMatrix addition and multiplication
Support for multiplying and adding large distributed matrices! Author: Burak Yavuz <brkyvz@gmail.com> Author: Burak Yavuz <brkyvz@dn51t42l.sunet> Author: Burak Yavuz <brkyvz@dn51t4rd.sunet> Author: Burak Yavuz <brkyvz@dn0a221430.sunet> Author: Burak Yavuz <brkyvz@dn0a22b17d.sunet> Closes #4274 from brkyvz/SPARK-3975PR2 and squashes the following commits: 17abd59 [Burak Yavuz] added indices to error message ac25783 [Burak Yavuz] merged masyer b66fd8b [Burak Yavuz] merged masyer e39baff [Burak Yavuz] addressed code review v1 2dba642 [Burak Yavuz] [SPARK-3975] Added support for BlockMatrix addition and multiplication fb7624b [Burak Yavuz] merged master 98c58ea [Burak Yavuz] added tests cdeb5df [Burak Yavuz] before adding tests c9bf247 [Burak Yavuz] fixed merge conflicts 1cb0d06 [Burak Yavuz] [SPARK-3976] Added doc f92a916 [Burak Yavuz] merge upstream 1a63b20 [Burak Yavuz] [SPARK-3974] Remove setPartition method. Isn't required 1e8bb2a [Burak Yavuz] [SPARK-3974] Change return type of cache and persist e3d24c3 [Burak Yavuz] [SPARK-3976] Pulled upstream changes fa3774f [Burak Yavuz] [SPARK-3976] updated matrix multiplication and addition implementation 239ab4b [Burak Yavuz] [SPARK-3974] Addressed @jkbradley's comments add7b05 [Burak Yavuz] [SPARK-3976] Updated code according to upstream changes e29acfd [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-3976 3127233 [Burak Yavuz] fixed merge conflicts with upstream ba414d2 [Burak Yavuz] [SPARK-3974] fixed frobenius norm ab6cde0 [Burak Yavuz] [SPARK-3974] Modifications cleaning code up, making size calculation more robust 9ae85aa [Burak Yavuz] [SPARK-3974] Made partitioner a variable inside BlockMatrix instead of a constructor variable d033861 [Burak Yavuz] [SPARK-3974] Removed SubMatrixInfo and added constructor without partitioner 8e954ab [Burak Yavuz] save changes bbeae8c [Burak Yavuz] merged master 987ea53 [Burak Yavuz] merged master 49b9586 [Burak Yavuz] [SPARK-3974] Updated testing utils from master 645afbe [Burak Yavuz] [SPARK-3974] Pull latest master beb1edd [Burak Yavuz] merge conflicts fixed f41d8db [Burak Yavuz] update tests b05aabb [Burak Yavuz] [SPARK-3974] Updated tests to reflect changes 56b0546 [Burak Yavuz] updates from 3974 PR b7b8a8f [Burak Yavuz] pull updates from master b2dec63 [Burak Yavuz] Pull changes from 3974 19c17e8 [Burak Yavuz] [SPARK-3974] Changed blockIdRow and blockIdCol 5f062e6 [Burak Yavuz] updates with 3974 6729fbd [Burak Yavuz] Updated with respect to SPARK-3974 PR 589fbb6 [Burak Yavuz] [SPARK-3974] Code review feedback addressed 63a4858 [Burak Yavuz] added grid multiplication aa8f086 [Burak Yavuz] [SPARK-3974] Additional comments added 7381b99 [Burak Yavuz] merge with PR1 f378e16 [Burak Yavuz] [SPARK-3974] Block Matrix Abstractions ready b693209 [Burak Yavuz] Ready for Pull request
Diffstat (limited to 'mllib/src/test')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala102
1 files changed, 90 insertions, 12 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index 461f1f92df..949d1c9939 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -17,14 +17,15 @@
package org.apache.spark.mllib.linalg.distributed
-import scala.util.Random
+import java.{util => ju}
import breeze.linalg.{DenseMatrix => BDM}
import org.scalatest.FunSuite
import org.apache.spark.SparkException
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
+import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrices, Matrix}
import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
@@ -37,7 +38,6 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
override def beforeAll() {
super.beforeAll()
-
val blocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
@@ -54,7 +54,7 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
}
test("grid partitioner") {
- val random = new Random()
+ val random = new ju.Random()
// This should generate a 4x4 grid of 1x2 blocks.
val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12)
val expected0 = Array(
@@ -148,6 +148,92 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(gridBasedMat.toBreeze() === expected)
}
+ test("add") {
+ val blocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
+ ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
+ ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
+ ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
+ ((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), // Added block that doesn't exist in A
+ ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
+ val rdd = sc.parallelize(blocks, numPartitions)
+ val B = new BlockMatrix(rdd, rowPerPart, colPerPart)
+
+ val expected = BDM(
+ (2.0, 0.0, 0.0, 0.0),
+ (0.0, 4.0, 2.0, 0.0),
+ (6.0, 2.0, 2.0, 0.0),
+ (0.0, 2.0, 4.0, 2.0),
+ (1.0, 0.0, 2.0, 10.0))
+
+ val AplusB = gridBasedMat.add(B)
+ assert(AplusB.numRows() === m)
+ assert(AplusB.numCols() === B.numCols())
+ assert(AplusB.toBreeze() === expected)
+
+ val C = new BlockMatrix(rdd, rowPerPart, colPerPart, m, n + 1) // columns don't match
+ intercept[IllegalArgumentException] {
+ gridBasedMat.add(C)
+ }
+ val largerBlocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(4, 4, new Array[Double](16))),
+ ((1, 0), new DenseMatrix(1, 4, Array(1.0, 0.0, 1.0, 5.0))))
+ val C2 = new BlockMatrix(sc.parallelize(largerBlocks, numPartitions), 4, 4, m, n)
+ intercept[SparkException] { // partitioning doesn't match
+ gridBasedMat.add(C2)
+ }
+ // adding BlockMatrices composed of SparseMatrices
+ val sparseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), SparseMatrix.speye(4))
+ val denseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), DenseMatrix.eye(4))
+ val sparseBM = new BlockMatrix(sc.makeRDD(sparseBlocks, 4), 4, 4, 8, 8)
+ val denseBM = new BlockMatrix(sc.makeRDD(denseBlocks, 4), 4, 4, 8, 8)
+
+ assert(sparseBM.add(sparseBM).toBreeze() === sparseBM.add(denseBM).toBreeze())
+ }
+
+ test("multiply") {
+ // identity matrix
+ val blocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0))),
+ ((1, 1), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0))))
+ val rdd = sc.parallelize(blocks, 2)
+ val B = new BlockMatrix(rdd, colPerPart, rowPerPart)
+ val expected = BDM(
+ (1.0, 0.0, 0.0, 0.0),
+ (0.0, 2.0, 1.0, 0.0),
+ (3.0, 1.0, 1.0, 0.0),
+ (0.0, 1.0, 2.0, 1.0),
+ (0.0, 0.0, 1.0, 5.0))
+
+ val AtimesB = gridBasedMat.multiply(B)
+ assert(AtimesB.numRows() === m)
+ assert(AtimesB.numCols() === n)
+ assert(AtimesB.toBreeze() === expected)
+ val C = new BlockMatrix(rdd, rowPerPart, colPerPart, m + 1, n) // dimensions don't match
+ intercept[IllegalArgumentException] {
+ gridBasedMat.multiply(C)
+ }
+ val largerBlocks = Seq(((0, 0), DenseMatrix.eye(4)))
+ val C2 = new BlockMatrix(sc.parallelize(largerBlocks, numPartitions), 4, 4)
+ intercept[SparkException] {
+ // partitioning doesn't match
+ gridBasedMat.multiply(C2)
+ }
+ val rand = new ju.Random(42)
+ val largerAblocks = for (i <- 0 until 20) yield ((i % 5, i / 5), DenseMatrix.rand(6, 4, rand))
+ val largerBblocks = for (i <- 0 until 16) yield ((i % 4, i / 4), DenseMatrix.rand(4, 4, rand))
+
+ // Try it with increased number of partitions
+ val largeA = new BlockMatrix(sc.parallelize(largerAblocks, 10), 6, 4)
+ val largeB = new BlockMatrix(sc.parallelize(largerBblocks, 8), 4, 4)
+ val largeC = largeA.multiply(largeB)
+ val localC = largeC.toLocalMatrix()
+ val result = largeA.toLocalMatrix().multiply(largeB.toLocalMatrix().asInstanceOf[DenseMatrix])
+ assert(largeC.numRows() === largeA.numRows())
+ assert(largeC.numCols() === largeB.numCols())
+ assert(localC ~== result absTol 1e-8)
+ }
+
test("validate") {
// No error
gridBasedMat.validate()
@@ -201,14 +287,6 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(AT.numCols() === gridBasedMat.numRows())
assert(AT.toBreeze() === expected)
- // partitioner must update as well
- val originalPartitioner = gridBasedMat.partitioner
- val ATpartitioner = AT.partitioner
- assert(originalPartitioner.colsPerPart === ATpartitioner.rowsPerPart)
- assert(originalPartitioner.rowsPerPart === ATpartitioner.colsPerPart)
- assert(originalPartitioner.cols === ATpartitioner.rows)
- assert(originalPartitioner.rows === ATpartitioner.cols)
-
// make sure it works when matrices are cached as well
gridBasedMat.cache()
val AT2 = gridBasedMat.transpose