aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-01-28 10:06:37 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-28 10:06:37 -0800
commiteeb53bf90e93b298eff48387d2e9ad699b52d001 (patch)
treed73dadc6186e24e0f93efe4c3fc71facc898c8d4 /mllib
parent622ff09d036b40caa4c177508e8a948beccfd88f (diff)
downloadspark-eeb53bf90e93b298eff48387d2e9ad699b52d001.tar.gz
spark-eeb53bf90e93b298eff48387d2e9ad699b52d001.tar.bz2
spark-eeb53bf90e93b298eff48387d2e9ad699b52d001.zip
[SPARK-3974][MLlib] Distributed Block Matrix Abstractions
This pull request includes the abstractions for the distributed BlockMatrix representation. `BlockMatrix` will allow users to store very large matrices in small blocks of local matrices. Specific partitioners, such as `RowBasedPartitioner` and `ColumnBasedPartitioner`, are implemented in order to optimize addition and multiplication operations that will be added in a following PR. This work is based on the ml-matrix repo developed at the AMPLab at UC Berkeley, CA. https://github.com/amplab/ml-matrix Additional thanks to rezazadeh, shivaram, and mengxr for guidance on the design. Author: Burak Yavuz <brkyvz@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Author: Burak Yavuz <brkyvz@dn51t42l.sunet> Author: Burak Yavuz <brkyvz@dn51t4rd.sunet> Author: Burak Yavuz <brkyvz@dn0a221430.sunet> Closes #3200 from brkyvz/SPARK-3974 and squashes the following commits: a8eace2 [Burak Yavuz] Merge pull request #2 from mengxr/brkyvz-SPARK-3974 feb32a7 [Xiangrui Meng] update tests e1d3ee8 [Xiangrui Meng] minor updates 24ec7b8 [Xiangrui Meng] update grid partitioner 5eecd48 [Burak Yavuz] fixed gridPartitioner and added tests 140f20e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-3974 1694c9e [Burak Yavuz] almost finished addressing comments f9d664b [Burak Yavuz] updated API and modified partitioning scheme eebbdf7 [Burak Yavuz] preliminary changes addressing code review 1a63b20 [Burak Yavuz] [SPARK-3974] Remove setPartition method. Isn't required 1e8bb2a [Burak Yavuz] [SPARK-3974] Change return type of cache and persist 239ab4b [Burak Yavuz] [SPARK-3974] Addressed @jkbradley's comments ba414d2 [Burak Yavuz] [SPARK-3974] fixed frobenius norm ab6cde0 [Burak Yavuz] [SPARK-3974] Modifications cleaning code up, making size calculation more robust 9ae85aa [Burak Yavuz] [SPARK-3974] Made partitioner a variable inside BlockMatrix instead of a constructor variable d033861 [Burak Yavuz] [SPARK-3974] Removed SubMatrixInfo and added constructor without partitioner 49b9586 [Burak Yavuz] [SPARK-3974] Updated testing utils from master 645afbe [Burak Yavuz] [SPARK-3974] Pull latest master b05aabb [Burak Yavuz] [SPARK-3974] Updated tests to reflect changes 19c17e8 [Burak Yavuz] [SPARK-3974] Changed blockIdRow and blockIdCol 589fbb6 [Burak Yavuz] [SPARK-3974] Code review feedback addressed aa8f086 [Burak Yavuz] [SPARK-3974] Additional comments added f378e16 [Burak Yavuz] [SPARK-3974] Block Matrix Abstractions ready b693209 [Burak Yavuz] Ready for Pull request
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala216
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala135
2 files changed, 351 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
new file mode 100644
index 0000000000..0ab74ba294
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.{Logging, Partitioner}
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A grid partitioner, which uses a regular grid to partition coordinates.
+ *
+ * @param rows Number of rows.
+ * @param cols Number of columns.
+ * @param rowsPerPart Number of rows per partition, which may be less at the bottom edge.
+ * @param colsPerPart Number of columns per partition, which may be less at the right edge.
+ */
+private[mllib] class GridPartitioner(
+ val rows: Int,
+ val cols: Int,
+ val rowsPerPart: Int,
+ val colsPerPart: Int) extends Partitioner {
+
+ require(rows > 0)
+ require(cols > 0)
+ require(rowsPerPart > 0)
+ require(colsPerPart > 0)
+
+ private val rowPartitions = math.ceil(rows / rowsPerPart).toInt
+ private val colPartitions = math.ceil(cols / colsPerPart).toInt
+
+ override val numPartitions = rowPartitions * colPartitions
+
+ /**
+ * Returns the index of the partition the input coordinate belongs to.
+ *
+ * @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in
+ * multiplication. k is ignored in computing partitions.
+ * @return The index of the partition, which the coordinate belongs to.
+ */
+ override def getPartition(key: Any): Int = {
+ key match {
+ case (i: Int, j: Int) =>
+ getPartitionId(i, j)
+ case (i: Int, j: Int, _: Int) =>
+ getPartitionId(i, j)
+ case _ =>
+ throw new IllegalArgumentException(s"Unrecognized key: $key.")
+ }
+ }
+
+ /** Partitions sub-matrices as blocks with neighboring sub-matrices. */
+ private def getPartitionId(i: Int, j: Int): Int = {
+ require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
+ require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
+ i / rowsPerPart + j / colsPerPart * rowPartitions
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case r: GridPartitioner =>
+ (this.rows == r.rows) && (this.cols == r.cols) &&
+ (this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart)
+ case _ =>
+ false
+ }
+ }
+}
+
+private[mllib] object GridPartitioner {
+
+ /** Creates a new [[GridPartitioner]] instance. */
+ def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
+ new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
+ }
+
+ /** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */
+ def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
+ require(suggestedNumPartitions > 0)
+ val scale = 1.0 / math.sqrt(suggestedNumPartitions)
+ val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt
+ val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt
+ new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
+ }
+}
+
+/**
+ * Represents a distributed matrix in blocks of local matrices.
+ *
+ * @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form
+ * this distributed matrix.
+ * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
+ * rows are not required to have the given number of rows
+ * @param colsPerBlock Number of columns that make up each block. The blocks forming the final
+ * columns are not required to have the given number of columns
+ * @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
+ * the number of rows will be calculated when `numRows` is invoked.
+ * @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
+ * zero, the number of columns will be calculated when `numCols` is invoked.
+ */
+class BlockMatrix(
+ val blocks: RDD[((Int, Int), Matrix)],
+ val rowsPerBlock: Int,
+ val colsPerBlock: Int,
+ private var nRows: Long,
+ private var nCols: Long) extends DistributedMatrix with Logging {
+
+ private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix)
+
+ /**
+ * Alternate constructor for BlockMatrix without the input of the number of rows and columns.
+ *
+ * @param rdd The RDD of SubMatrices (local matrices) that form this matrix
+ * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
+ * rows are not required to have the given number of rows
+ * @param colsPerBlock Number of columns that make up each block. The blocks forming the final
+ * columns are not required to have the given number of columns
+ */
+ def this(
+ rdd: RDD[((Int, Int), Matrix)],
+ rowsPerBlock: Int,
+ colsPerBlock: Int) = {
+ this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L)
+ }
+
+ override def numRows(): Long = {
+ if (nRows <= 0L) estimateDim()
+ nRows
+ }
+
+ override def numCols(): Long = {
+ if (nCols <= 0L) estimateDim()
+ nCols
+ }
+
+ val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
+ val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt
+
+ private[mllib] var partitioner: GridPartitioner =
+ GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)
+
+ /** Estimates the dimensions of the matrix. */
+ private def estimateDim(): Unit = {
+ val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
+ (blockRowIndex.toLong * rowsPerBlock + mat.numRows,
+ blockColIndex.toLong * colsPerBlock + mat.numCols)
+ }.reduce { (x0, x1) =>
+ (math.max(x0._1, x1._1), math.max(x0._2, x1._2))
+ }
+ if (nRows <= 0L) nRows = rows
+ assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.")
+ if (nCols <= 0L) nCols = cols
+ assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
+ }
+
+ /** Caches the underlying RDD. */
+ def cache(): this.type = {
+ blocks.cache()
+ this
+ }
+
+ /** Persists the underlying RDD with the specified storage level. */
+ def persist(storageLevel: StorageLevel): this.type = {
+ blocks.persist(storageLevel)
+ this
+ }
+
+ /** Collect the distributed matrix on the driver as a `DenseMatrix`. */
+ def toLocalMatrix(): Matrix = {
+ require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " +
+ s"Int.MaxValue. Currently numRows: ${numRows()}")
+ require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " +
+ s"Int.MaxValue. Currently numCols: ${numCols()}")
+ require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " +
+ s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}")
+ val m = numRows().toInt
+ val n = numCols().toInt
+ val mem = m * n / 125000
+ if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")
+
+ val localBlocks = blocks.collect()
+ val values = new Array[Double](m * n)
+ localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) =>
+ val rowOffset = blockRowIndex * rowsPerBlock
+ val colOffset = blockColIndex * colsPerBlock
+ submat.foreachActive { (i, j, v) =>
+ val indexOffset = (j + colOffset) * m + rowOffset + i
+ values(indexOffset) = v
+ }
+ }
+ new DenseMatrix(m, n, values)
+ }
+
+ /** Collects data and assembles a local dense breeze matrix (for test only). */
+ private[mllib] def toBreeze(): BDM[Double] = {
+ val localMat = toLocalMatrix()
+ new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
new file mode 100644
index 0000000000..05efbc8e8d
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import scala.util.Random
+
+import breeze.linalg.{DenseMatrix => BDM}
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+
+class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
+
+ val m = 5
+ val n = 4
+ val rowPerPart = 2
+ val colPerPart = 2
+ val numPartitions = 3
+ var gridBasedMat: BlockMatrix = _
+
+ override def beforeAll() {
+ super.beforeAll()
+
+ val blocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
+ ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
+ ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
+ ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
+ ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
+
+ gridBasedMat = new BlockMatrix(sc.parallelize(blocks, numPartitions), rowPerPart, colPerPart)
+ }
+
+ test("size") {
+ assert(gridBasedMat.numRows() === m)
+ assert(gridBasedMat.numCols() === n)
+ }
+
+ test("grid partitioner") {
+ val random = new Random()
+ // This should generate a 4x4 grid of 1x2 blocks.
+ val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12)
+ val expected0 = Array(
+ Array(0, 0, 4, 4, 8, 8, 12),
+ Array(1, 1, 5, 5, 9, 9, 13),
+ Array(2, 2, 6, 6, 10, 10, 14),
+ Array(3, 3, 7, 7, 11, 11, 15))
+ for (i <- 0 until 4; j <- 0 until 7) {
+ assert(part0.getPartition((i, j)) === expected0(i)(j))
+ assert(part0.getPartition((i, j, random.nextInt())) === expected0(i)(j))
+ }
+
+ intercept[IllegalArgumentException] {
+ part0.getPartition((-1, 0))
+ }
+
+ intercept[IllegalArgumentException] {
+ part0.getPartition((4, 0))
+ }
+
+ intercept[IllegalArgumentException] {
+ part0.getPartition((0, -1))
+ }
+
+ intercept[IllegalArgumentException] {
+ part0.getPartition((0, 7))
+ }
+
+ val part1 = GridPartitioner(2, 2, suggestedNumPartitions = 5)
+ val expected1 = Array(
+ Array(0, 2),
+ Array(1, 3))
+ for (i <- 0 until 2; j <- 0 until 2) {
+ assert(part1.getPartition((i, j)) === expected1(i)(j))
+ assert(part1.getPartition((i, j, random.nextInt())) === expected1(i)(j))
+ }
+
+ val part2 = GridPartitioner(2, 2, suggestedNumPartitions = 5)
+ assert(part0 !== part2)
+ assert(part1 === part2)
+
+ val part3 = new GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2)
+ val expected3 = Array(
+ Array(0, 0, 2),
+ Array(1, 1, 3))
+ for (i <- 0 until 2; j <- 0 until 3) {
+ assert(part3.getPartition((i, j)) === expected3(i)(j))
+ assert(part3.getPartition((i, j, random.nextInt())) === expected3(i)(j))
+ }
+
+ val part4 = GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2)
+ assert(part3 === part4)
+
+ intercept[IllegalArgumentException] {
+ new GridPartitioner(2, 2, rowsPerPart = 0, colsPerPart = 1)
+ }
+
+ intercept[IllegalArgumentException] {
+ GridPartitioner(2, 2, rowsPerPart = 1, colsPerPart = 0)
+ }
+
+ intercept[IllegalArgumentException] {
+ GridPartitioner(2, 2, suggestedNumPartitions = 0)
+ }
+ }
+
+ test("toBreeze and toLocalMatrix") {
+ val expected = BDM(
+ (1.0, 0.0, 0.0, 0.0),
+ (0.0, 2.0, 1.0, 0.0),
+ (3.0, 1.0, 1.0, 0.0),
+ (0.0, 1.0, 2.0, 1.0),
+ (0.0, 0.0, 1.0, 5.0))
+
+ val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix]
+ assert(gridBasedMat.toLocalMatrix() === dense)
+ assert(gridBasedMat.toBreeze() === expected)
+ }
+}