diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2015-10-16 15:30:07 -0700 |
---|---|---|
committer | Joseph K. Bradley <joseph@databricks.com> | 2015-10-16 15:30:07 -0700 |
commit | 10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8 (patch) | |
tree | 55a35672cffbfab9e941f010641e34689b616bf6 /mllib/src/test/scala/org | |
parent | 1ec0a0dc2819d3db3555799cb78c2946f652bff4 (diff) | |
download | spark-10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8.tar.gz spark-10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8.tar.bz2 spark-10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8.zip |
[SPARK-10599] [MLLIB] Lower communication for block matrix multiplication
This PR aims to decrease communication costs in BlockMatrix multiplication in two ways:
- Simulate the multiplication on the driver, and figure out which blocks actually need to be shuffled
- Send the block once to a partition, and join inside the partition rather than sending multiple copies to the same partition
**NOTE**: One important note is that right now, the old behavior of checking for multiple blocks with the same index is lost. This is not hard to add, but is a little more expensive than how it was.
Initial benchmarking showed promising results (look below), however I did hit some `FileNotFound` exceptions with the new implementation after the shuffle.
Size A: 1e5 x 1e5
Size B: 1e5 x 1e5
Block Sizes: 1024 x 1024
Sparsity: 0.01
Old implementation: 1m 13s
New implementation: 9s
cc avulanov Would you be interested in helping me benchmark this? I used your code from the mailing list (which you sent about 3 months ago?), and the old implementation didn't even run, but the new implementation completed in 268s in a 120 GB / 16 core cluster
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #8757 from brkyvz/opt-bmm.
Diffstat (limited to 'mllib/src/test/scala/org')
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 93fe04c139..b8eb103058 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -235,6 +235,24 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { assert(localC ~== result absTol 1e-8) } + test("simulate multiply") { + val blocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0)))) + val rdd = sc.parallelize(blocks, 2) + val B = new BlockMatrix(rdd, colPerPart, rowPerPart) + val resultPartitioner = GridPartitioner(gridBasedMat.numRowBlocks, B.numColBlocks, + math.max(numPartitions, 2)) + val (destinationsA, destinationsB) = gridBasedMat.simulateMultiply(B, resultPartitioner) + assert(destinationsA((0, 0)) === Set(0)) + assert(destinationsA((0, 1)) === Set(2)) + assert(destinationsA((1, 0)) === Set(0)) + assert(destinationsA((1, 1)) === Set(2)) + assert(destinationsA((2, 1)) === Set(3)) + assert(destinationsB((0, 0)) === Set(0)) + assert(destinationsB((1, 1)) === Set(2, 3)) + } + test("validate") { // No error gridBasedMat.validate() |