aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorFokko Driesprong <f.driesprong@catawiki.nl>2016-04-14 17:32:20 -0700
committerXiangrui Meng <meng@databricks.com>2016-04-14 17:32:20 -0700
commitc80586d9e820d19fc328b3e4c6f1c1439f5583a7 (patch)
tree64a8c9380e809885c18a8d30667ddba68f81b533 /mllib
parent01dd1f5c07f5c9ba91389c1556f911b028475cd3 (diff)
downloadspark-c80586d9e820d19fc328b3e4c6f1c1439f5583a7.tar.gz
spark-c80586d9e820d19fc328b3e4c6f1c1439f5583a7.tar.bz2
spark-c80586d9e820d19fc328b3e4c6f1c1439f5583a7.zip
[SPARK-12869] Implemented an improved version of the toIndexedRowMatrix
Hi guys, I've implemented an improved version of the `toIndexedRowMatrix` function on the `BlockMatrix`. I needed this for a project, but would like to share it with the rest of the community. In the case of dense matrices, it can increase performance up to 19 times: https://github.com/Fokko/BlockMatrixToIndexedRowMatrix If there are any questions or suggestions, please let me know. Keep up the good work! Cheers. Author: Fokko Driesprong <f.driesprong@catawiki.nl> Author: Fokko Driesprong <fokko@driesprongen.nl> Closes #10839 from Fokko/master.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala34
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala31
2 files changed, 57 insertions, 8 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 89c332ae38..580d7a98fb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -19,12 +19,12 @@ package org.apache.spark.mllib.linalg.distributed
import scala.collection.mutable.ArrayBuffer
-import breeze.linalg.{DenseMatrix => BDM, Matrix => BM}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM, SparseVector => BSV, Vector => BV}
import org.apache.spark.{Partitioner, SparkException}
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix}
+import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -264,13 +264,35 @@ class BlockMatrix @Since("1.3.0") (
new CoordinateMatrix(entryRDD, numRows(), numCols())
}
+
/** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */
@Since("1.3.0")
def toIndexedRowMatrix(): IndexedRowMatrix = {
- require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " +
- s"numCols: ${numCols()}")
- // TODO: This implementation may be optimized
- toCoordinateMatrix().toIndexedRowMatrix()
+ val cols = numCols().toInt
+
+ require(cols < Int.MaxValue, s"The number of columns should be less than Int.MaxValue ($cols).")
+
+ val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) =>
+ mat.rowIter.zipWithIndex.map {
+ case (vector, rowIdx) =>
+ blockRowIdx * rowsPerBlock + rowIdx -> (blockColIdx, vector.toBreeze)
+ }
+ }.groupByKey().map { case (rowIdx, vectors) =>
+ val numberNonZeroPerRow = vectors.map(_._2.activeSize).sum.toDouble / cols.toDouble
+
+ val wholeVector = if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz
+ BSV.zeros[Double](cols)
+ } else {
+ BDV.zeros[Double](cols)
+ }
+
+ vectors.foreach { case (blockColIdx: Int, vec: BV[Double]) =>
+ val offset = colsPerBlock * blockColIdx
+ wholeVector(offset until offset + colsPerBlock) := vec
+ }
+ new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector))
+ }
+ new IndexedRowMatrix(rows)
}
/** Collect the distributed matrix on the driver as a `DenseMatrix`. */
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index f737d2c51a..f37eaf225a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg.distributed
import java.{util => ju}
-import breeze.linalg.{DenseMatrix => BDM}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix}
+import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector, Matrices, Matrix, SparseMatrix, SparseVector, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
@@ -134,6 +134,33 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(rowMat.numRows() === m)
assert(rowMat.numCols() === n)
assert(rowMat.toBreeze() === gridBasedMat.toBreeze())
+
+ val rows = 1
+ val cols = 10
+
+ val matDense = new DenseMatrix(rows, cols,
+ Array(1.0, 1.0, 3.0, 2.0, 5.0, 6.0, 7.0, 1.0, 2.0, 3.0))
+ val matSparse = new SparseMatrix(rows, cols,
+ Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1), Array(0), Array(1.0))
+
+ val vectors: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), matDense),
+ ((1, 0), matSparse))
+
+ val rdd = sc.parallelize(vectors)
+ val B = new BlockMatrix(rdd, rows, cols)
+
+ val C = B.toIndexedRowMatrix.rows.collect
+
+ (C(0).vector.toBreeze, C(1).vector.toBreeze) match {
+ case (denseVector: BDV[Double], sparseVector: BSV[Double]) =>
+ assert(denseVector.length === sparseVector.length)
+
+ assert(matDense.toArray === denseVector.toArray)
+ assert(matSparse.toArray === sparseVector.toArray)
+ case _ =>
+ throw new RuntimeException("IndexedRow returns vectors of unexpected type")
+ }
}
test("toBreeze and toLocalMatrix") {