diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2015-01-28 23:42:07 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-01-28 23:42:07 -0800 |
commit | a63be1a18f7b7d77f7deef2abc9a5be6ad24ae28 (patch) | |
tree | ad6511934a211842ef9fd9ba979389b87c2a6abe /mllib | |
parent | 5b9760de8dd2dab7cf9a4f5c65869e4ed296a938 (diff) | |
download | spark-a63be1a18f7b7d77f7deef2abc9a5be6ad24ae28.tar.gz spark-a63be1a18f7b7d77f7deef2abc9a5be6ad24ae28.tar.bz2 spark-a63be1a18f7b7d77f7deef2abc9a5be6ad24ae28.zip |
[SPARK-3977] Conversion methods for BlockMatrix to other Distributed Matrices
The conversion methods for `BlockMatrix`. Conversions go through `CoordinateMatrix` in order to cause a shuffle so that intermediate operations will be stored on disk and the expensive initial computation will be mitigated.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #4256 from brkyvz/SPARK-3977PR and squashes the following commits:
4df37fe [Burak Yavuz] moved TODO inside code block
b049c07 [Burak Yavuz] addressed code review feedback v1
66cb755 [Burak Yavuz] added default toBlockMatrix conversion
851f2a2 [Burak Yavuz] added better comments and checks
cdb9895 [Burak Yavuz] [SPARK-3977] Conversion methods for BlockMatrix to other Distributed Matrices
Diffstat (limited to 'mllib')
6 files changed, 127 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 0ab74ba294..426dbf4805 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -17,10 +17,12 @@ package org.apache.spark.mllib.linalg.distributed +import scala.collection.mutable.ArrayBuffer + import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.{Logging, Partitioner} -import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} +import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -182,6 +184,28 @@ class BlockMatrix( this } + /** Converts to CoordinateMatrix. */ + def toCoordinateMatrix(): CoordinateMatrix = { + val entryRDD = blocks.flatMap { case ((blockRowIndex, blockColIndex), mat) => + val rowStart = blockRowIndex.toLong * rowsPerBlock + val colStart = blockColIndex.toLong * colsPerBlock + val entryValues = new ArrayBuffer[MatrixEntry]() + mat.foreachActive { (i, j, v) => + if (v != 0.0) entryValues.append(new MatrixEntry(rowStart + i, colStart + j, v)) + } + entryValues + } + new CoordinateMatrix(entryRDD, numRows(), numCols()) + } + + /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */ + def toIndexedRowMatrix(): IndexedRowMatrix = { + require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " + + s"numCols: ${numCols()}") + // TODO: This implementation may be optimized + toCoordinateMatrix().toIndexedRowMatrix() + } + /** Collect the distributed matrix on the driver as a `DenseMatrix`. */ def toLocalMatrix(): Matrix = { require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala index b60559c853..078d1fac44 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala @@ -21,8 +21,7 @@ import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Matrix, SparseMatrix, Vectors} /** * :: Experimental :: @@ -98,6 +97,46 @@ class CoordinateMatrix( toIndexedRowMatrix().toRowMatrix() } + /** Converts to BlockMatrix. Creates blocks of [[SparseMatrix]] with size 1024 x 1024. */ + def toBlockMatrix(): BlockMatrix = { + toBlockMatrix(1024, 1024) + } + + /** + * Converts to BlockMatrix. Creates blocks of [[SparseMatrix]]. + * @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have + * a smaller value. Must be an integer value greater than 0. + * @param colsPerBlock The number of columns of each block. The blocks at the right edge may have + * a smaller value. Must be an integer value greater than 0. + * @return a [[BlockMatrix]] + */ + def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = { + require(rowsPerBlock > 0, + s"rowsPerBlock needs to be greater than 0. rowsPerBlock: $rowsPerBlock") + require(colsPerBlock > 0, + s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock") + val m = numRows() + val n = numCols() + val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt + val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt + val partitioner = GridPartitioner(numRowBlocks, numColBlocks, entries.partitions.length) + + val blocks: RDD[((Int, Int), Matrix)] = entries.map { entry => + val blockRowIndex = (entry.i / rowsPerBlock).toInt + val blockColIndex = (entry.j / colsPerBlock).toInt + + val rowId = entry.i % rowsPerBlock + val colId = entry.j % colsPerBlock + + ((blockRowIndex, blockColIndex), (rowId.toInt, colId.toInt, entry.value)) + }.groupByKey(partitioner).map { case ((blockRowIndex, blockColIndex), entry) => + val effRows = math.min(m - blockRowIndex.toLong * rowsPerBlock, rowsPerBlock).toInt + val effCols = math.min(n - blockColIndex.toLong * colsPerBlock, colsPerBlock).toInt + ((blockRowIndex, blockColIndex), SparseMatrix.fromCOO(effRows, effCols, entry)) + } + new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, m, n) + } + /** Determines the size by computing the max row/column index. */ private def computeSize() { // Reduce will throw an exception if `entries` is empty. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index c518271f04..3be530fa07 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -75,6 +75,24 @@ class IndexedRowMatrix( new RowMatrix(rows.map(_.vector), 0L, nCols) } + /** Converts to BlockMatrix. Creates blocks of [[SparseMatrix]] with size 1024 x 1024. */ + def toBlockMatrix(): BlockMatrix = { + toBlockMatrix(1024, 1024) + } + + /** + * Converts to BlockMatrix. Creates blocks of [[SparseMatrix]]. + * @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have + * a smaller value. Must be an integer value greater than 0. + * @param colsPerBlock The number of columns of each block. The blocks at the right edge may have + * a smaller value. Must be an integer value greater than 0. + * @return a [[BlockMatrix]] + */ + def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = { + // TODO: This implementation may be optimized + toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock) + } + /** * Converts this matrix to a * [[org.apache.spark.mllib.linalg.distributed.CoordinateMatrix]]. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 05efbc8e8d..7284d03d24 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -120,6 +120,20 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { } } + test("toCoordinateMatrix") { + val coordMat = gridBasedMat.toCoordinateMatrix() + assert(coordMat.numRows() === m) + assert(coordMat.numCols() === n) + assert(coordMat.toBreeze() === gridBasedMat.toBreeze()) + } + + test("toIndexedRowMatrix") { + val rowMat = gridBasedMat.toIndexedRowMatrix() + assert(rowMat.numRows() === m) + assert(rowMat.numCols() === n) + assert(rowMat.toBreeze() === gridBasedMat.toBreeze()) + } + test("toBreeze and toLocalMatrix") { val expected = BDM( (1.0, 0.0, 0.0, 0.0), diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala index 80bef814ce..04b36a9ef9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala @@ -100,4 +100,18 @@ class CoordinateMatrixSuite extends FunSuite with MLlibTestSparkContext { Vectors.dense(0.0, 9.0, 0.0, 0.0)) assert(rows === expected) } + + test("toBlockMatrix") { + val blockMat = mat.toBlockMatrix(2, 2) + assert(blockMat.numRows() === m) + assert(blockMat.numCols() === n) + assert(blockMat.toBreeze() === mat.toBreeze()) + + intercept[IllegalArgumentException] { + mat.toBlockMatrix(-1, 2) + } + intercept[IllegalArgumentException] { + mat.toBlockMatrix(2, 0) + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala index b86c2ca5ff..2ab53cc13d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala @@ -88,6 +88,21 @@ class IndexedRowMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(coordMat.toBreeze() === idxRowMat.toBreeze()) } + test("toBlockMatrix") { + val idxRowMat = new IndexedRowMatrix(indexedRows) + val blockMat = idxRowMat.toBlockMatrix(2, 2) + assert(blockMat.numRows() === m) + assert(blockMat.numCols() === n) + assert(blockMat.toBreeze() === idxRowMat.toBreeze()) + + intercept[IllegalArgumentException] { + idxRowMat.toBlockMatrix(-1, 2) + } + intercept[IllegalArgumentException] { + idxRowMat.toBlockMatrix(2, 0) + } + } + test("multiply a local matrix") { val A = new IndexedRowMatrix(indexedRows) val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) |