From 10046ea76cf8f0d08fe7ef548e4dbec69d9c73b8 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 16 Oct 2015 15:30:07 -0700 Subject: [SPARK-10599] [MLLIB] Lower communication for block matrix multiplication This PR aims to decrease communication costs in BlockMatrix multiplication in two ways: - Simulate the multiplication on the driver, and figure out which blocks actually need to be shuffled - Send the block once to a partition, and join inside the partition rather than sending multiple copies to the same partition **NOTE**: One important note is that right now, the old behavior of checking for multiple blocks with the same index is lost. This is not hard to add, but is a little more expensive than how it was. Initial benchmarking showed promising results (look below), however I did hit some `FileNotFound` exceptions with the new implementation after the shuffle. Size A: 1e5 x 1e5 Size B: 1e5 x 1e5 Block Sizes: 1024 x 1024 Sparsity: 0.01 Old implementation: 1m 13s New implementation: 9s cc avulanov Would you be interested in helping me benchmark this? I used your code from the mailing list (which you sent about 3 months ago?), and the old implementation didn't even run, but the new implementation completed in 268s in a 120 GB / 16 core cluster Author: Burak Yavuz Closes #8757 from brkyvz/opt-bmm. --- .../mllib/linalg/distributed/BlockMatrixSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'mllib/src/test/scala/org') diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 93fe04c139..b8eb103058 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -235,6 +235,24 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { assert(localC ~== result absTol 1e-8) } + test("simulate multiply") { + val blocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 1.0)))) + val rdd = sc.parallelize(blocks, 2) + val B = new BlockMatrix(rdd, colPerPart, rowPerPart) + val resultPartitioner = GridPartitioner(gridBasedMat.numRowBlocks, B.numColBlocks, + math.max(numPartitions, 2)) + val (destinationsA, destinationsB) = gridBasedMat.simulateMultiply(B, resultPartitioner) + assert(destinationsA((0, 0)) === Set(0)) + assert(destinationsA((0, 1)) === Set(2)) + assert(destinationsA((1, 0)) === Set(0)) + assert(destinationsA((1, 1)) === Set(2)) + assert(destinationsA((2, 1)) === Set(3)) + assert(destinationsB((0, 0)) === Set(0)) + assert(destinationsB((1, 1)) === Set(2, 3)) + } + test("validate") { // No error gridBasedMat.validate() -- cgit v1.2.3