aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test/scala/org
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-10-16 15:30:07 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-10-16 15:30:07 -0700
commit10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8 (patch)
tree55a35672cffbfab9e941f010641e34689b616bf6 /mllib/src/test/scala/org
parent1ec0a0dc2819d3db3555799cb78c2946f652bff4 (diff)
downloadspark-10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8.tar.gz
spark-10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8.tar.bz2
spark-10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8.zip
[SPARK-10599] [MLLIB] Lower communication for block matrix multiplication
This PR aims to decrease communication costs in BlockMatrix multiplication in two ways: - Simulate the multiplication on the driver, and figure out which blocks actually need to be shuffled - Send the block once to a partition, and join inside the partition rather than sending multiple copies to the same partition **NOTE**: One important note is that right now, the old behavior of checking for multiple blocks with the same index is lost. This is not hard to add, but is a little more expensive than how it was. Initial benchmarking showed promising results (look below), however I did hit some `FileNotFound` exceptions with the new implementation after the shuffle. Size A: 1e5 x 1e5 Size B: 1e5 x 1e5 Block Sizes: 1024 x 1024 Sparsity: 0.01 Old implementation: 1m 13s New implementation: 9s cc avulanov Would you be interested in helping me benchmark this? I used your code from the mailing list (which you sent about 3 months ago?), and the old implementation didn't even run, but the new implementation completed in 268s in a 120 GB / 16 core cluster Author: Burak Yavuz <brkyvz@gmail.com> Closes #8757 from brkyvz/opt-bmm.
Diffstat (limited to 'mllib/src/test/scala/org')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala18
1 files changed, 18 insertions, 0 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index 93fe04c139..b8eb103058 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -235,6 +235,24 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(localC ~== result absTol 1e-8)
}
+ test("simulate multiply") {
+ val blocks: Seq[((Int, Int), Matrix)] = Seq(
+ ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0))),
+ ((1, 1), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0))))
+ val rdd = sc.parallelize(blocks, 2)
+ val B = new BlockMatrix(rdd, colPerPart, rowPerPart)
+ val resultPartitioner = GridPartitioner(gridBasedMat.numRowBlocks, B.numColBlocks,
+ math.max(numPartitions, 2))
+ val (destinationsA, destinationsB) = gridBasedMat.simulateMultiply(B, resultPartitioner)
+ assert(destinationsA((0, 0)) === Set(0))
+ assert(destinationsA((0, 1)) === Set(2))
+ assert(destinationsA((1, 0)) === Set(0))
+ assert(destinationsA((1, 1)) === Set(2))
+ assert(destinationsA((2, 1)) === Set(3))
+ assert(destinationsB((0, 0)) === Set(0))
+ assert(destinationsB((1, 1)) === Set(2, 3))
+ }
+
test("validate") {
// No error
gridBasedMat.validate()