diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2015-01-31 00:47:30 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-01-31 00:47:30 -0800 |
commit | ef8974b1b7ff177d9636d091770dff64fedc385f (patch) | |
tree | 38ce3d36dbb724fd812c30d49ab3be7467a9739d /mllib/src/main | |
parent | 34250a613cee39b03f05f2d54ae37029abd8a502 (diff) | |
download | spark-ef8974b1b7ff177d9636d091770dff64fedc385f.tar.gz spark-ef8974b1b7ff177d9636d091770dff64fedc385f.tar.bz2 spark-ef8974b1b7ff177d9636d091770dff64fedc385f.zip |
[SPARK-3975] Added support for BlockMatrix addition and multiplication
Support for multiplying and adding large distributed matrices!
Author: Burak Yavuz <brkyvz@gmail.com>
Author: Burak Yavuz <brkyvz@dn51t42l.sunet>
Author: Burak Yavuz <brkyvz@dn51t4rd.sunet>
Author: Burak Yavuz <brkyvz@dn0a221430.sunet>
Author: Burak Yavuz <brkyvz@dn0a22b17d.sunet>
Closes #4274 from brkyvz/SPARK-3975PR2 and squashes the following commits:
17abd59 [Burak Yavuz] added indices to error message
ac25783 [Burak Yavuz] merged masyer
b66fd8b [Burak Yavuz] merged masyer
e39baff [Burak Yavuz] addressed code review v1
2dba642 [Burak Yavuz] [SPARK-3975] Added support for BlockMatrix addition and multiplication
fb7624b [Burak Yavuz] merged master
98c58ea [Burak Yavuz] added tests
cdeb5df [Burak Yavuz] before adding tests
c9bf247 [Burak Yavuz] fixed merge conflicts
1cb0d06 [Burak Yavuz] [SPARK-3976] Added doc
f92a916 [Burak Yavuz] merge upstream
1a63b20 [Burak Yavuz] [SPARK-3974] Remove setPartition method. Isn't required
1e8bb2a [Burak Yavuz] [SPARK-3974] Change return type of cache and persist
e3d24c3 [Burak Yavuz] [SPARK-3976] Pulled upstream changes
fa3774f [Burak Yavuz] [SPARK-3976] updated matrix multiplication and addition implementation
239ab4b [Burak Yavuz] [SPARK-3974] Addressed @jkbradley's comments
add7b05 [Burak Yavuz] [SPARK-3976] Updated code according to upstream changes
e29acfd [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-3976
3127233 [Burak Yavuz] fixed merge conflicts with upstream
ba414d2 [Burak Yavuz] [SPARK-3974] fixed frobenius norm
ab6cde0 [Burak Yavuz] [SPARK-3974] Modifications cleaning code up, making size calculation more robust
9ae85aa [Burak Yavuz] [SPARK-3974] Made partitioner a variable inside BlockMatrix instead of a constructor variable
d033861 [Burak Yavuz] [SPARK-3974] Removed SubMatrixInfo and added constructor without partitioner
8e954ab [Burak Yavuz] save changes
bbeae8c [Burak Yavuz] merged master
987ea53 [Burak Yavuz] merged master
49b9586 [Burak Yavuz] [SPARK-3974] Updated testing utils from master
645afbe [Burak Yavuz] [SPARK-3974] Pull latest master
beb1edd [Burak Yavuz] merge conflicts fixed
f41d8db [Burak Yavuz] update tests
b05aabb [Burak Yavuz] [SPARK-3974] Updated tests to reflect changes
56b0546 [Burak Yavuz] updates from 3974 PR
b7b8a8f [Burak Yavuz] pull updates from master
b2dec63 [Burak Yavuz] Pull changes from 3974
19c17e8 [Burak Yavuz] [SPARK-3974] Changed blockIdRow and blockIdCol
5f062e6 [Burak Yavuz] updates with 3974
6729fbd [Burak Yavuz] Updated with respect to SPARK-3974 PR
589fbb6 [Burak Yavuz] [SPARK-3974] Code review feedback addressed
63a4858 [Burak Yavuz] added grid multiplication
aa8f086 [Burak Yavuz] [SPARK-3974] Additional comments added
7381b99 [Burak Yavuz] merge with PR1
f378e16 [Burak Yavuz] [SPARK-3974] Block Matrix Abstractions ready
b693209 [Burak Yavuz] Ready for Pull request
Diffstat (limited to 'mllib/src/main')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala | 7 | ||||
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala | 104 |
2 files changed, 96 insertions, 15 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index 34e0392f1b..079f7ca564 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -275,10 +275,8 @@ private[spark] object BLAS extends Serializable with Logging { logDebug("gemm: alpha is equal to 0. Returning C.") } else { A match { - case sparse: SparseMatrix => - gemm(alpha, sparse, B, beta, C) - case dense: DenseMatrix => - gemm(alpha, dense, B, beta, C) + case sparse: SparseMatrix => gemm(alpha, sparse, B, beta, C) + case dense: DenseMatrix => gemm(alpha, dense, B, beta, C) case _ => throw new IllegalArgumentException(s"gemm doesn't support matrix type ${A.getClass}.") } @@ -306,7 +304,6 @@ private[spark] object BLAS extends Serializable with Logging { s"The rows of C don't match the rows of A. C: ${C.numRows}, A: ${A.numRows}") require(B.numCols == C.numCols, s"The columns of C don't match the columns of B. C: ${C.numCols}, A: ${B.numCols}") - nativeBLAS.dgemm(tAstr, tBstr, A.numRows, B.numCols, A.numCols, alpha, A.values, lda, B.values, ldb, beta, C.values, C.numRows) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index a6405975eb..3871152d06 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.{SparkException, Logging, Partitioner} -import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -45,8 +45,8 @@ private[mllib] class GridPartitioner( require(rowsPerPart > 0) require(colsPerPart > 0) - private val rowPartitions = math.ceil(rows / rowsPerPart).toInt - private val colPartitions = math.ceil(cols / colsPerPart).toInt + private val rowPartitions = math.ceil(rows * 1.0 / rowsPerPart).toInt + private val colPartitions = math.ceil(cols * 1.0 / colsPerPart).toInt override val numPartitions = rowPartitions * colPartitions @@ -106,8 +106,9 @@ private[mllib] object GridPartitioner { /** * Represents a distributed matrix in blocks of local matrices. * - * @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form - * this distributed matrix. + * @param blocks The RDD of sub-matrix blocks ((blockRowIndex, blockColIndex), sub-matrix) that + * form this distributed matrix. If multiple blocks with the same index exist, the + * results for operations like add and multiply will be unpredictable. * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final * rows are not required to have the given number of rows * @param colsPerBlock Number of columns that make up each block. The blocks forming the final @@ -129,17 +130,19 @@ class BlockMatrix( /** * Alternate constructor for BlockMatrix without the input of the number of rows and columns. * - * @param rdd The RDD of SubMatrices (local matrices) that form this matrix + * @param blocks The RDD of sub-matrix blocks ((blockRowIndex, blockColIndex), sub-matrix) that + * form this distributed matrix. If multiple blocks with the same index exist, the + * results for operations like add and multiply will be unpredictable. * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final * rows are not required to have the given number of rows * @param colsPerBlock Number of columns that make up each block. The blocks forming the final * columns are not required to have the given number of columns */ def this( - rdd: RDD[((Int, Int), Matrix)], + blocks: RDD[((Int, Int), Matrix)], rowsPerBlock: Int, colsPerBlock: Int) = { - this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L) + this(blocks, rowsPerBlock, colsPerBlock, 0L, 0L) } override def numRows(): Long = { @@ -155,7 +158,7 @@ class BlockMatrix( val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt - private[mllib] var partitioner: GridPartitioner = + private[mllib] def createPartitioner(): GridPartitioner = GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size) private lazy val blockInfo = blocks.mapValues(block => (block.numRows, block.numCols)).cache() @@ -255,7 +258,6 @@ class BlockMatrix( val n = numCols().toInt val mem = m * n / 125000 if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!") - val localBlocks = blocks.collect() val values = new Array[Double](m * n) localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) => @@ -283,4 +285,86 @@ class BlockMatrix( val localMat = toLocalMatrix() new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) } + + /** Adds two block matrices together. The matrices must have the same size and matching + * `rowsPerBlock` and `colsPerBlock` values. If one of the blocks that are being added are + * instances of [[SparseMatrix]], the resulting sub matrix will also be a [[SparseMatrix]], even + * if it is being added to a [[DenseMatrix]]. If two dense matrices are added, the output will + * also be a [[DenseMatrix]]. + */ + def add(other: BlockMatrix): BlockMatrix = { + require(numRows() == other.numRows(), "Both matrices must have the same number of rows. " + + s"A.numRows: ${numRows()}, B.numRows: ${other.numRows()}") + require(numCols() == other.numCols(), "Both matrices must have the same number of columns. " + + s"A.numCols: ${numCols()}, B.numCols: ${other.numCols()}") + if (rowsPerBlock == other.rowsPerBlock && colsPerBlock == other.colsPerBlock) { + val addedBlocks = blocks.cogroup(other.blocks, createPartitioner()) + .map { case ((blockRowIndex, blockColIndex), (a, b)) => + if (a.size > 1 || b.size > 1) { + throw new SparkException("There are multiple MatrixBlocks with indices: " + + s"($blockRowIndex, $blockColIndex). Please remove them.") + } + if (a.isEmpty) { + new MatrixBlock((blockRowIndex, blockColIndex), b.head) + } else if (b.isEmpty) { + new MatrixBlock((blockRowIndex, blockColIndex), a.head) + } else { + val result = a.head.toBreeze + b.head.toBreeze + new MatrixBlock((blockRowIndex, blockColIndex), Matrices.fromBreeze(result)) + } + } + new BlockMatrix(addedBlocks, rowsPerBlock, colsPerBlock, numRows(), numCols()) + } else { + throw new SparkException("Cannot add matrices with different block dimensions") + } + } + + /** Left multiplies this [[BlockMatrix]] to `other`, another [[BlockMatrix]]. The `colsPerBlock` + * of this matrix must equal the `rowsPerBlock` of `other`. If `other` contains + * [[SparseMatrix]], they will have to be converted to a [[DenseMatrix]]. The output + * [[BlockMatrix]] will only consist of blocks of [[DenseMatrix]]. This may cause + * some performance issues until support for multiplying two sparse matrices is added. + */ + def multiply(other: BlockMatrix): BlockMatrix = { + require(numCols() == other.numRows(), "The number of columns of A and the number of rows " + + s"of B must be equal. A.numCols: ${numCols()}, B.numRows: ${other.numRows()}. If you " + + "think they should be equal, try setting the dimensions of A and B explicitly while " + + "initializing them.") + if (colsPerBlock == other.rowsPerBlock) { + val resultPartitioner = GridPartitioner(numRowBlocks, other.numColBlocks, + math.max(blocks.partitions.length, other.blocks.partitions.length)) + // Each block of A must be multiplied with the corresponding blocks in each column of B. + // TODO: Optimize to send block to a partition once, similar to ALS + val flatA = blocks.flatMap { case ((blockRowIndex, blockColIndex), block) => + Iterator.tabulate(other.numColBlocks)(j => ((blockRowIndex, j, blockColIndex), block)) + } + // Each block of B must be multiplied with the corresponding blocks in each row of A. + val flatB = other.blocks.flatMap { case ((blockRowIndex, blockColIndex), block) => + Iterator.tabulate(numRowBlocks)(i => ((i, blockColIndex, blockRowIndex), block)) + } + val newBlocks: RDD[MatrixBlock] = flatA.cogroup(flatB, resultPartitioner) + .flatMap { case ((blockRowIndex, blockColIndex, _), (a, b)) => + if (a.size > 1 || b.size > 1) { + throw new SparkException("There are multiple MatrixBlocks with indices: " + + s"($blockRowIndex, $blockColIndex). Please remove them.") + } + if (a.nonEmpty && b.nonEmpty) { + val C = b.head match { + case dense: DenseMatrix => a.head.multiply(dense) + case sparse: SparseMatrix => a.head.multiply(sparse.toDense()) + case _ => throw new SparkException(s"Unrecognized matrix type ${b.head.getClass}.") + } + Iterator(((blockRowIndex, blockColIndex), C.toBreeze)) + } else { + Iterator() + } + }.reduceByKey(resultPartitioner, (a, b) => a + b) + .mapValues(Matrices.fromBreeze) + // TODO: Try to use aggregateByKey instead of reduceByKey to get rid of intermediate matrices + new BlockMatrix(newBlocks, rowsPerBlock, other.colsPerBlock, numRows(), other.numCols()) + } else { + throw new SparkException("colsPerBlock of A doesn't match rowsPerBlock of B. " + + s"A.colsPerBlock: $colsPerBlock, B.rowsPerBlock: ${other.rowsPerBlock}") + } + } } |