From 6ee8338b377de9dc0adb5b26d9ea9e8519eb58ab Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 30 Jan 2015 13:59:10 -0800 Subject: [SPARK-5486] Added validate method to BlockMatrix The `validate` method will allow users to debug their `BlockMatrix`, if operations like `add` or `multiply` return unexpected results. It checks the following properties in a `BlockMatrix`: - Are the dimensions of the `BlockMatrix` consistent with what the user entered: (`nRows`, `nCols`) - Are the dimensions of each `MatrixBlock` consistent with what the user entered: (`rowsPerBlock`, `colsPerBlock`) - Are there blocks with duplicate indices Author: Burak Yavuz Closes #4279 from brkyvz/SPARK-5486 and squashes the following commits: c152a73 [Burak Yavuz] addressed code review v2 598c583 [Burak Yavuz] merged master b55ac5c [Burak Yavuz] addressed code review v1 25f083b [Burak Yavuz] simplify implementation 0aa519a [Burak Yavuz] [SPARK-5486] Added validate method to BlockMatrix --- .../mllib/linalg/distributed/BlockMatrix.scala | 47 +++++++++++++++++++--- .../linalg/distributed/BlockMatrixSuite.scala | 42 +++++++++++++++++++ 2 files changed, 84 insertions(+), 5 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 693419f827..a6405975eb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -21,8 +21,8 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseMatrix => BDM} -import org.apache.spark.{Logging, Partitioner} -import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix} +import org.apache.spark.{SparkException, Logging, Partitioner} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -158,11 +158,13 @@ class BlockMatrix( private[mllib] var partitioner: GridPartitioner = GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size) + private lazy val blockInfo = blocks.mapValues(block => (block.numRows, block.numCols)).cache() + /** Estimates the dimensions of the matrix. */ private def estimateDim(): Unit = { - val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) => - (blockRowIndex.toLong * rowsPerBlock + mat.numRows, - blockColIndex.toLong * colsPerBlock + mat.numCols) + val (rows, cols) = blockInfo.map { case ((blockRowIndex, blockColIndex), (m, n)) => + (blockRowIndex.toLong * rowsPerBlock + m, + blockColIndex.toLong * colsPerBlock + n) }.reduce { (x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2)) } @@ -172,6 +174,41 @@ class BlockMatrix( assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.") } + def validate(): Unit = { + logDebug("Validating BlockMatrix...") + // check if the matrix is larger than the claimed dimensions + estimateDim() + logDebug("BlockMatrix dimensions are okay...") + + // Check if there are multiple MatrixBlocks with the same index. + blockInfo.countByKey().foreach { case (key, cnt) => + if (cnt > 1) { + throw new SparkException(s"Found multiple MatrixBlocks with the indices $key. Please " + + "remove blocks with duplicate indices.") + } + } + logDebug("MatrixBlock indices are okay...") + // Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock + // The first tuple is the index and the second tuple is the dimensions of the MatrixBlock + val dimensionMsg = s"dimensions different than rowsPerBlock: $rowsPerBlock, and " + + s"colsPerBlock: $colsPerBlock. Blocks on the right and bottom edges can have smaller " + + s"dimensions. You may use the repartition method to fix this issue." + blockInfo.foreach { case ((blockRowIndex, blockColIndex), (m, n)) => + if ((blockRowIndex < numRowBlocks - 1 && m != rowsPerBlock) || + (blockRowIndex == numRowBlocks - 1 && (m <= 0 || m > rowsPerBlock))) { + throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " + + dimensionMsg) + } + if ((blockColIndex < numColBlocks - 1 && n != colsPerBlock) || + (blockColIndex == numColBlocks - 1 && (n <= 0 || n > colsPerBlock))) { + throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " + + dimensionMsg) + } + } + logDebug("MatrixBlock dimensions are okay...") + logDebug("BlockMatrix is valid!") + } + /** Caches the underlying RDD. */ def cache(): this.type = { blocks.cache() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 03f34308dd..461f1f92df 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -22,6 +22,7 @@ import scala.util.Random import breeze.linalg.{DenseMatrix => BDM} import org.scalatest.FunSuite +import org.apache.spark.SparkException import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -147,6 +148,47 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(gridBasedMat.toBreeze() === expected) } + test("validate") { + // No error + gridBasedMat.validate() + // Wrong MatrixBlock dimensions + val blocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), + ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) + val rdd = sc.parallelize(blocks, numPartitions) + val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart) + val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1) + intercept[SparkException] { + wrongRowPerParts.validate() + } + intercept[SparkException] { + wrongColPerParts.validate() + } + // Wrong BlockMatrix dimensions + val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4) + intercept[AssertionError] { + wrongRowSize.validate() + } + val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2) + intercept[AssertionError] { + wrongColSize.validate() + } + // Duplicate indices + val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), + ((0, 0), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + ((1, 1), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) + val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2) + intercept[SparkException] { + dupMatrix.validate() + } + } + test("transpose") { val expected = BDM( (1.0, 0.0, 3.0, 0.0, 0.0), -- cgit v1.2.3