aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-01-30 13:59:10 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-30 13:59:10 -0800
commit6ee8338b377de9dc0adb5b26d9ea9e8519eb58ab (patch)
tree17f3a0992afef3197877a916f960bcf4de996c02 /mllib
parent0a95085f09754c7b883f29a2babb17209c6541bd (diff)
downloadspark-6ee8338b377de9dc0adb5b26d9ea9e8519eb58ab.tar.gz
spark-6ee8338b377de9dc0adb5b26d9ea9e8519eb58ab.tar.bz2
spark-6ee8338b377de9dc0adb5b26d9ea9e8519eb58ab.zip
[SPARK-5486] Added validate method to BlockMatrix
The `validate` method will allow users to debug their `BlockMatrix`, if operations like `add` or `multiply` return unexpected results. It checks the following properties in a `BlockMatrix`: - Are the dimensions of the `BlockMatrix` consistent with what the user entered: (`nRows`, `nCols`) - Are the dimensions of each `MatrixBlock` consistent with what the user entered: (`rowsPerBlock`, `colsPerBlock`) - Are there blocks with duplicate indices Author: Burak Yavuz <brkyvz@gmail.com> Closes #4279 from brkyvz/SPARK-5486 and squashes the following commits: c152a73 [Burak Yavuz] addressed code review v2 598c583 [Burak Yavuz] merged master b55ac5c [Burak Yavuz] addressed code review v1 25f083b [Burak Yavuz] simplify implementation 0aa519a [Burak Yavuz] [SPARK-5486] Added validate method to BlockMatrix
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala47
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala42
2 files changed, 84 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 693419f827..a6405975eb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -21,8 +21,8 @@ import scala.collection.mutable.ArrayBuffer
import breeze.linalg.{DenseMatrix => BDM}
-import org.apache.spark.{Logging, Partitioner}
-import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
+import org.apache.spark.{SparkException, Logging, Partitioner}
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -158,11 +158,13 @@ class BlockMatrix(
private[mllib] var partitioner: GridPartitioner =
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)
+ private lazy val blockInfo = blocks.mapValues(block => (block.numRows, block.numCols)).cache()
+
/** Estimates the dimensions of the matrix. */
private def estimateDim(): Unit = {
- val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
- (blockRowIndex.toLong * rowsPerBlock + mat.numRows,
- blockColIndex.toLong * colsPerBlock + mat.numCols)
+ val (rows, cols) = blockInfo.map { case ((blockRowIndex, blockColIndex), (m, n)) =>
+ (blockRowIndex.toLong * rowsPerBlock + m,
+ blockColIndex.toLong * colsPerBlock + n)
}.reduce { (x0, x1) =>
(math.max(x0._1, x1._1), math.max(x0._2, x1._2))
}
@@ -172,6 +174,41 @@ class BlockMatrix(
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
}
+ def validate(): Unit = {
+ logDebug("Validating BlockMatrix...")
+ // check if the matrix is larger than the claimed dimensions
+ estimateDim()
+ logDebug("BlockMatrix dimensions are okay...")
+
+ // Check if there are multiple MatrixBlocks with the same index.
+ blockInfo.countByKey().foreach { case (key, cnt) =>
+ if (cnt > 1) {
+ throw new SparkException(s"Found multiple MatrixBlocks with the indices $key. Please " +
+ "remove blocks with duplicate indices.")
+ }
+ }
+ logDebug("MatrixBlock indices are okay...")
+ // Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock
+ // The first tuple is the index and the second tuple is the dimensions of the MatrixBlock
+ val dimensionMsg = s"dimensions different than rowsPerBlock: $rowsPerBlock, and " +
+ s"colsPerBlock: $colsPerBlock. Blocks on the right and bottom edges can have smaller " +
+ s"dimensions. You may use the repartition method to fix this issue."
+ blockInfo.foreach { case ((blockRowIndex, blockColIndex), (m, n)) =>
+ if ((blockRowIndex < numRowBlocks - 1 && m != rowsPerBlock) ||
+ (blockRowIndex == numRowBlocks - 1 && (m <= 0 || m > rowsPerBlock))) {
+ throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
+ dimensionMsg)
+ }
+ if ((blockColIndex < numColBlocks - 1 && n != colsPerBlock) ||
+ (blockColIndex == numColBlocks - 1 && (n <= 0 || n > colsPerBlock))) {
+ throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
+ dimensionMsg)
+ }
+ }
+ logDebug("MatrixBlock dimensions are okay...")
+ logDebug("BlockMatrix is valid!")
+ }
+
/** Caches the underlying RDD. */
def cache(): this.type = {
blocks.cache()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index 03f34308dd..461f1f92df 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -22,6 +22,7 @@ import scala.util.Random
import breeze.linalg.{DenseMatrix => BDM}
import org.scalatest.FunSuite
+import org.apache.spark.SparkException
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -147,6 +148,47 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(gridBasedMat.toBreeze() === expected)
}
+ test("validate") {
+ // No error
+ gridBasedMat.validate()
+ // Wrong MatrixBlock dimensions
+ val blocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
+ ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
+ ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
+ ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
+ ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
+ val rdd = sc.parallelize(blocks, numPartitions)
+ val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart)
+ val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1)
+ intercept[SparkException] {
+ wrongRowPerParts.validate()
+ }
+ intercept[SparkException] {
+ wrongColPerParts.validate()
+ }
+ // Wrong BlockMatrix dimensions
+ val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4)
+ intercept[AssertionError] {
+ wrongRowSize.validate()
+ }
+ val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2)
+ intercept[AssertionError] {
+ wrongColSize.validate()
+ }
+ // Duplicate indices
+ val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
+ ((0, 0), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
+ ((1, 1), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
+ ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
+ ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
+ val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2)
+ intercept[SparkException] {
+ dupMatrix.validate()
+ }
+ }
+
test("transpose") {
val expected = BDM(
(1.0, 0.0, 3.0, 0.0, 0.0),