aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEhsan M.Kermani <ehsanmo1367@gmail.com>2016-03-14 19:17:09 -0700
committerJoseph K. Bradley <joseph@databricks.com>2016-03-14 19:17:09 -0700
commit992142b87ed5b507493e4f9fac3f72ba14fafbbc (patch)
tree366bf4b9c707c0b06dd4dd5bff26148eed5617cb
parent06dec37455c3f800897defee6fad0da623f26050 (diff)
downloadspark-992142b87ed5b507493e4f9fac3f72ba14fafbbc.tar.gz
spark-992142b87ed5b507493e4f9fac3f72ba14fafbbc.tar.bz2
spark-992142b87ed5b507493e4f9fac3f72ba14fafbbc.zip
[SPARK-11826][MLLIB] Refactor add() and subtract() methods
srowen Could you please check this when you have time? Author: Ehsan M.Kermani <ehsanmo1367@gmail.com> Closes #9916 from ehsanmok/JIRA-11826.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala58
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala43
2 files changed, 88 insertions, 13 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index ae1faf6a2d..6f0b0a9bc6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed
import scala.collection.mutable.ArrayBuffer
-import breeze.linalg.{DenseMatrix => BDM}
+import breeze.linalg.{DenseMatrix => BDM, Matrix => BM}
import org.apache.spark.{Logging, Partitioner, SparkException}
import org.apache.spark.annotation.Since
@@ -317,40 +317,72 @@ class BlockMatrix @Since("1.3.0") (
}
/**
- * Adds two block matrices together. The matrices must have the same size and matching
- * `rowsPerBlock` and `colsPerBlock` values. If one of the blocks that are being added are
- * instances of [[SparseMatrix]], the resulting sub matrix will also be a [[SparseMatrix]], even
- * if it is being added to a [[DenseMatrix]]. If two dense matrices are added, the output will
- * also be a [[DenseMatrix]].
+ * For given matrices `this` and `other` of compatible dimensions and compatible block dimensions,
+ * it applies a binary function on their corresponding blocks.
+ *
+ * @param other The second BlockMatrix argument for the operator specified by `binMap`
+ * @param binMap A function taking two breeze matrices and returning a breeze matrix
+ * @return A [[BlockMatrix]] whose blocks are the results of a specified binary map on blocks
+ * of `this` and `other`.
+ * Note: `blockMap` ONLY works for `add` and `subtract` methods and it does not support
+ * operators such as (a, b) => -a + b
+ * TODO: Make the use of zero matrices more storage efficient.
*/
- @Since("1.3.0")
- def add(other: BlockMatrix): BlockMatrix = {
+ private[mllib] def blockMap(
+ other: BlockMatrix,
+ binMap: (BM[Double], BM[Double]) => BM[Double]): BlockMatrix = {
require(numRows() == other.numRows(), "Both matrices must have the same number of rows. " +
s"A.numRows: ${numRows()}, B.numRows: ${other.numRows()}")
require(numCols() == other.numCols(), "Both matrices must have the same number of columns. " +
s"A.numCols: ${numCols()}, B.numCols: ${other.numCols()}")
if (rowsPerBlock == other.rowsPerBlock && colsPerBlock == other.colsPerBlock) {
- val addedBlocks = blocks.cogroup(other.blocks, createPartitioner())
+ val newBlocks = blocks.cogroup(other.blocks, createPartitioner())
.map { case ((blockRowIndex, blockColIndex), (a, b)) =>
if (a.size > 1 || b.size > 1) {
throw new SparkException("There are multiple MatrixBlocks with indices: " +
s"($blockRowIndex, $blockColIndex). Please remove them.")
}
if (a.isEmpty) {
- new MatrixBlock((blockRowIndex, blockColIndex), b.head)
+ val zeroBlock = BM.zeros[Double](b.head.numRows, b.head.numCols)
+ val result = binMap(zeroBlock, b.head.toBreeze)
+ new MatrixBlock((blockRowIndex, blockColIndex), Matrices.fromBreeze(result))
} else if (b.isEmpty) {
new MatrixBlock((blockRowIndex, blockColIndex), a.head)
} else {
- val result = a.head.toBreeze + b.head.toBreeze
+ val result = binMap(a.head.toBreeze, b.head.toBreeze)
new MatrixBlock((blockRowIndex, blockColIndex), Matrices.fromBreeze(result))
}
}
- new BlockMatrix(addedBlocks, rowsPerBlock, colsPerBlock, numRows(), numCols())
+ new BlockMatrix(newBlocks, rowsPerBlock, colsPerBlock, numRows(), numCols())
} else {
- throw new SparkException("Cannot add matrices with different block dimensions")
+ throw new SparkException("Cannot perform on matrices with different block dimensions")
}
}
+ /**
+ * Adds the given block matrix `other` to `this` block matrix: `this + other`.
+ * The matrices must have the same size and matching `rowsPerBlock` and `colsPerBlock`
+ * values. If one of the blocks that are being added are instances of [[SparseMatrix]],
+ * the resulting sub matrix will also be a [[SparseMatrix]], even if it is being added
+ * to a [[DenseMatrix]]. If two dense matrices are added, the output will also be a
+ * [[DenseMatrix]].
+ */
+ @Since("1.3.0")
+ def add(other: BlockMatrix): BlockMatrix =
+ blockMap(other, (x: BM[Double], y: BM[Double]) => x + y)
+
+ /**
+ * Subtracts the given block matrix `other` from `this` block matrix: `this - other`.
+ * The matrices must have the same size and matching `rowsPerBlock` and `colsPerBlock`
+ * values. If one of the blocks that are being subtracted are instances of [[SparseMatrix]],
+ * the resulting sub matrix will also be a [[SparseMatrix]], even if it is being subtracted
+ * from a [[DenseMatrix]]. If two dense matrices are subtracted, the output will also be a
+ * [[DenseMatrix]].
+ */
+ @Since("2.0.0")
+ def subtract(other: BlockMatrix): BlockMatrix =
+ blockMap(other, (x: BM[Double], y: BM[Double]) => x - y)
+
/** Block (i,j) --> Set of destination partitions */
private type BlockDestinations = Map[(Int, Int), Set[Int]]
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index d91ba8a6fd..f737d2c51a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -192,6 +192,49 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(sparseBM.add(sparseBM).toBreeze() === sparseBM.add(denseBM).toBreeze())
}
+ test("subtract") {
+ val blocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
+ ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
+ ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
+ ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
+ ((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), // Added block that doesn't exist in A
+ ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
+ val rdd = sc.parallelize(blocks, numPartitions)
+ val B = new BlockMatrix(rdd, rowPerPart, colPerPart)
+
+ val expected = BDM(
+ (0.0, 0.0, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 0.0),
+ (-1.0, 0.0, 0.0, 0.0))
+
+ val AsubtractB = gridBasedMat.subtract(B)
+ assert(AsubtractB.numRows() === m)
+ assert(AsubtractB.numCols() === B.numCols())
+ assert(AsubtractB.toBreeze() === expected)
+
+ val C = new BlockMatrix(rdd, rowPerPart, colPerPart, m, n + 1) // columns don't match
+ intercept[IllegalArgumentException] {
+ gridBasedMat.subtract(C)
+ }
+ val largerBlocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(4, 4, new Array[Double](16))),
+ ((1, 0), new DenseMatrix(1, 4, Array(1.0, 0.0, 1.0, 5.0))))
+ val C2 = new BlockMatrix(sc.parallelize(largerBlocks, numPartitions), 4, 4, m, n)
+ intercept[SparkException] { // partitioning doesn't match
+ gridBasedMat.subtract(C2)
+ }
+ // subtracting BlockMatrices composed of SparseMatrices
+ val sparseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), SparseMatrix.speye(4))
+ val denseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), DenseMatrix.eye(4))
+ val sparseBM = new BlockMatrix(sc.makeRDD(sparseBlocks, 4), 4, 4, 8, 8)
+ val denseBM = new BlockMatrix(sc.makeRDD(denseBlocks, 4), 4, 4, 8, 8)
+
+ assert(sparseBM.subtract(sparseBM).toBreeze() === sparseBM.subtract(denseBM).toBreeze())
+ }
+
test("multiply") {
// identity matrix
val blocks: Seq[((Int, Int), Matrix)] = Seq(