aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-01-28 23:42:07 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-28 23:42:07 -0800
commita63be1a18f7b7d77f7deef2abc9a5be6ad24ae28 (patch)
treead6511934a211842ef9fd9ba979389b87c2a6abe /mllib
parent5b9760de8dd2dab7cf9a4f5c65869e4ed296a938 (diff)
downloadspark-a63be1a18f7b7d77f7deef2abc9a5be6ad24ae28.tar.gz
spark-a63be1a18f7b7d77f7deef2abc9a5be6ad24ae28.tar.bz2
spark-a63be1a18f7b7d77f7deef2abc9a5be6ad24ae28.zip
[SPARK-3977] Conversion methods for BlockMatrix to other Distributed Matrices
The conversion methods for `BlockMatrix`. Conversions go through `CoordinateMatrix` in order to cause a shuffle so that intermediate operations will be stored on disk and the expensive initial computation will be mitigated. Author: Burak Yavuz <brkyvz@gmail.com> Closes #4256 from brkyvz/SPARK-3977PR and squashes the following commits: 4df37fe [Burak Yavuz] moved TODO inside code block b049c07 [Burak Yavuz] addressed code review feedback v1 66cb755 [Burak Yavuz] added default toBlockMatrix conversion 851f2a2 [Burak Yavuz] added better comments and checks cdb9895 [Burak Yavuz] [SPARK-3977] Conversion methods for BlockMatrix to other Distributed Matrices
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala26
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala43
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala18
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala14
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala14
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala15
6 files changed, 127 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 0ab74ba294..426dbf4805 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -17,10 +17,12 @@
package org.apache.spark.mllib.linalg.distributed
+import scala.collection.mutable.ArrayBuffer
+
import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.{Logging, Partitioner}
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
+import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -182,6 +184,28 @@ class BlockMatrix(
this
}
+ /** Converts to CoordinateMatrix. */
+ def toCoordinateMatrix(): CoordinateMatrix = {
+ val entryRDD = blocks.flatMap { case ((blockRowIndex, blockColIndex), mat) =>
+ val rowStart = blockRowIndex.toLong * rowsPerBlock
+ val colStart = blockColIndex.toLong * colsPerBlock
+ val entryValues = new ArrayBuffer[MatrixEntry]()
+ mat.foreachActive { (i, j, v) =>
+ if (v != 0.0) entryValues.append(new MatrixEntry(rowStart + i, colStart + j, v))
+ }
+ entryValues
+ }
+ new CoordinateMatrix(entryRDD, numRows(), numCols())
+ }
+
+ /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */
+ def toIndexedRowMatrix(): IndexedRowMatrix = {
+ require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " +
+ s"numCols: ${numCols()}")
+ // TODO: This implementation may be optimized
+ toCoordinateMatrix().toIndexedRowMatrix()
+ }
+
/** Collect the distributed matrix on the driver as a `DenseMatrix`. */
def toLocalMatrix(): Matrix = {
require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " +
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
index b60559c853..078d1fac44 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
@@ -21,8 +21,7 @@ import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Matrix, SparseMatrix, Vectors}
/**
* :: Experimental ::
@@ -98,6 +97,46 @@ class CoordinateMatrix(
toIndexedRowMatrix().toRowMatrix()
}
+ /** Converts to BlockMatrix. Creates blocks of [[SparseMatrix]] with size 1024 x 1024. */
+ def toBlockMatrix(): BlockMatrix = {
+ toBlockMatrix(1024, 1024)
+ }
+
+ /**
+ * Converts to BlockMatrix. Creates blocks of [[SparseMatrix]].
+ * @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have
+ * a smaller value. Must be an integer value greater than 0.
+ * @param colsPerBlock The number of columns of each block. The blocks at the right edge may have
+ * a smaller value. Must be an integer value greater than 0.
+ * @return a [[BlockMatrix]]
+ */
+ def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
+ require(rowsPerBlock > 0,
+ s"rowsPerBlock needs to be greater than 0. rowsPerBlock: $rowsPerBlock")
+ require(colsPerBlock > 0,
+ s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock")
+ val m = numRows()
+ val n = numCols()
+ val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
+ val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt
+ val partitioner = GridPartitioner(numRowBlocks, numColBlocks, entries.partitions.length)
+
+ val blocks: RDD[((Int, Int), Matrix)] = entries.map { entry =>
+ val blockRowIndex = (entry.i / rowsPerBlock).toInt
+ val blockColIndex = (entry.j / colsPerBlock).toInt
+
+ val rowId = entry.i % rowsPerBlock
+ val colId = entry.j % colsPerBlock
+
+ ((blockRowIndex, blockColIndex), (rowId.toInt, colId.toInt, entry.value))
+ }.groupByKey(partitioner).map { case ((blockRowIndex, blockColIndex), entry) =>
+ val effRows = math.min(m - blockRowIndex.toLong * rowsPerBlock, rowsPerBlock).toInt
+ val effCols = math.min(n - blockColIndex.toLong * colsPerBlock, colsPerBlock).toInt
+ ((blockRowIndex, blockColIndex), SparseMatrix.fromCOO(effRows, effCols, entry))
+ }
+ new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, m, n)
+ }
+
/** Determines the size by computing the max row/column index. */
private def computeSize() {
// Reduce will throw an exception if `entries` is empty.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
index c518271f04..3be530fa07 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
@@ -75,6 +75,24 @@ class IndexedRowMatrix(
new RowMatrix(rows.map(_.vector), 0L, nCols)
}
+ /** Converts to BlockMatrix. Creates blocks of [[SparseMatrix]] with size 1024 x 1024. */
+ def toBlockMatrix(): BlockMatrix = {
+ toBlockMatrix(1024, 1024)
+ }
+
+ /**
+ * Converts to BlockMatrix. Creates blocks of [[SparseMatrix]].
+ * @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have
+ * a smaller value. Must be an integer value greater than 0.
+ * @param colsPerBlock The number of columns of each block. The blocks at the right edge may have
+ * a smaller value. Must be an integer value greater than 0.
+ * @return a [[BlockMatrix]]
+ */
+ def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
+ // TODO: This implementation may be optimized
+ toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
+ }
+
/**
* Converts this matrix to a
* [[org.apache.spark.mllib.linalg.distributed.CoordinateMatrix]].
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index 05efbc8e8d..7284d03d24 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -120,6 +120,20 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
}
}
+ test("toCoordinateMatrix") {
+ val coordMat = gridBasedMat.toCoordinateMatrix()
+ assert(coordMat.numRows() === m)
+ assert(coordMat.numCols() === n)
+ assert(coordMat.toBreeze() === gridBasedMat.toBreeze())
+ }
+
+ test("toIndexedRowMatrix") {
+ val rowMat = gridBasedMat.toIndexedRowMatrix()
+ assert(rowMat.numRows() === m)
+ assert(rowMat.numCols() === n)
+ assert(rowMat.toBreeze() === gridBasedMat.toBreeze())
+ }
+
test("toBreeze and toLocalMatrix") {
val expected = BDM(
(1.0, 0.0, 0.0, 0.0),
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
index 80bef814ce..04b36a9ef9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
@@ -100,4 +100,18 @@ class CoordinateMatrixSuite extends FunSuite with MLlibTestSparkContext {
Vectors.dense(0.0, 9.0, 0.0, 0.0))
assert(rows === expected)
}
+
+ test("toBlockMatrix") {
+ val blockMat = mat.toBlockMatrix(2, 2)
+ assert(blockMat.numRows() === m)
+ assert(blockMat.numCols() === n)
+ assert(blockMat.toBreeze() === mat.toBreeze())
+
+ intercept[IllegalArgumentException] {
+ mat.toBlockMatrix(-1, 2)
+ }
+ intercept[IllegalArgumentException] {
+ mat.toBlockMatrix(2, 0)
+ }
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
index b86c2ca5ff..2ab53cc13d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
@@ -88,6 +88,21 @@ class IndexedRowMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(coordMat.toBreeze() === idxRowMat.toBreeze())
}
+ test("toBlockMatrix") {
+ val idxRowMat = new IndexedRowMatrix(indexedRows)
+ val blockMat = idxRowMat.toBlockMatrix(2, 2)
+ assert(blockMat.numRows() === m)
+ assert(blockMat.numCols() === n)
+ assert(blockMat.toBreeze() === idxRowMat.toBreeze())
+
+ intercept[IllegalArgumentException] {
+ idxRowMat.toBlockMatrix(-1, 2)
+ }
+ intercept[IllegalArgumentException] {
+ idxRowMat.toBlockMatrix(2, 0)
+ }
+ }
+
test("multiply a local matrix") {
val A = new IndexedRowMatrix(indexedRows)
val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))