aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test/scala/org/apache
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2017-01-26 20:10:17 -0800
committerBurak Yavuz <brkyvz@gmail.com>2017-01-26 20:10:17 -0800
commit1191fe267d2faad2a99a83f3375ce2d9d382cfa0 (patch)
tree785944036930e9732bd407b48d3e057a468c66ab /mllib/src/test/scala/org/apache
parent9f523d3192c71a728fd8a2a64f52bbc337f2f026 (diff)
downloadspark-1191fe267d2faad2a99a83f3375ce2d9d382cfa0.tar.gz
spark-1191fe267d2faad2a99a83f3375ce2d9d382cfa0.tar.bz2
spark-1191fe267d2faad2a99a83f3375ce2d9d382cfa0.zip
[SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication
## What changes were proposed in this pull request? ### The problem in current block matrix mulitiplication As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have `M*N` dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger than M and P, then the following problem may occur: - when the middle dimension N is too large, it will cause reducer OOM. - even if OOM do not occur, it will still cause parallism too low. - when N is much large than M and P, and matrix A and B have many partitions, it may cause too many partition on M and P dimension, it will cause much larger shuffled data size. (I will expain this in detail in the following.) ### Key point of my improvement In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, to resolve this problem. In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above: suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 numColBlocks`), blocks arranged in 2 rows, 100 cols: ``` A00 A01 A02 ... A0,99 A10 A11 A12 ... A1,99 ``` and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 numColBlocks`), blocks arranged in 100 rows, 2 cols: ``` B00 B01 B10 B11 B20 B21 ... B99,0 B99,1 ``` Suppose all blocks in the two matrices are dense for now. Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains `2 * 2` blocks), the current algorithm will contains two shuffle steps: **step-1** Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following: ``` A00 A01 A02 ... A0,99 B00 B10 B20 ... B99,0 shuffled into reducer-00 A00 A01 A02 ... A0,99 B01 B11 B21 ... B99,1 shuffled into reducer-01 A10 A11 A12 ... A1,99 B00 B10 B20 ... B99,0 shuffled into reducer-10 A10 A11 A12 ... A1,99 B01 B11 B21 ... B99,1 shuffled into reducer-11 ``` and the shuffling above is a `cogroup` transform, note that each reducer contains **only one group**. **step-2** Step-2 will do an `aggregateByKey` transform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block. The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small. and, we can see that, each reducer contains only one group(the group concept in `coGroup` transform), each group contains 200 blocks. This is terrible because we know that `coGroup` transformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 10000 cols blocks or more instead of 100? Than each reducer will load 20000 blocks into memory. It will easily cause reducer OOM. This PR try to resolve the problem described above. When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly. In this PR, I introduce a `numMidDimSplits` parameter, represent how many splits it will cut on the middle dimension N. Still using the example described above, now we set `numMidDimSplits = 10`, now we can generate 40 reducers in **step-1**: the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks. now the shuffle works as following: **reducer-000 to reducer-009** ``` A0,0 A0,10 A0,20 ... A0,90 B0,0 B10,0 B20,0 ... B90,0 shuffled into reducer-000 A0,1 A0,11 A0,21 ... A0,91 B1,0 B11,0 B21,0 ... B91,0 shuffled into reducer-001 A0,2 A0,12 A0,22 ... A0,92 B2,0 B12,0 B22,0 ... B92,0 shuffled into reducer-002 ... A0,9 A0,19 A0,29 ... A0,99 B9,0 B19,0 B29,0 ... B99,0 shuffled into reducer-009 ``` **reducer-010 to reducer-019** ``` A0,0 A0,10 A0,20 ... A0,90 B0,1 B10,1 B20,1 ... B90,1 shuffled into reducer-010 A0,1 A0,11 A0,21 ... A0,91 B1,1 B11,1 B21,1 ... B91,1 shuffled into reducer-011 A0,2 A0,12 A0,22 ... A0,92 B2,1 B12,1 B22,1 ... B92,1 shuffled into reducer-012 ... A0,9 A0,19 A0,29 ... A0,99 B9,1 B19,1 B29,1 ... B99,1 shuffled into reducer-019 ``` **reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to the above, I omit to write them out. ### API for this optimized algorithm I add a new API as following: ``` def multiply( other: BlockMatrix, numMidDimSplits: Int // middle dimension split number, expained above ): BlockMatrix ``` ### Shuffled data size analysis (compared under the same parallelism) The optimization has some subtle influence on the total shuffled data size. Appropriate `numMidDimSplits` will significantly reduce the shuffled data size, but too large `numMidDimSplits` may increase the shuffled data in reverse. For now I don't want to introduce formula to make thing too complex, I only use a simple case to represent it here: Suppose we have two same size square matrices X and Y, both have `16 numRowBlocks * 16 numColBlocks`. X and Y are both dense matrix. Now let me analysis the shuffling data size in the following case: **case 1: X and Y both partitioned in 16 rowPartitions and 16 colPartitions, numMidDimSplits = 1** ShufflingDataSize = (16 * 16 * (16 + 16) + 16 * 16) blocks = 8448 blocks parallelism = 16 * 16 * 1 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm. **case 2: X and Y both partitioned in 8 rowPartitions and 8 colPartitions, numMidDimSplits = 4** ShufflingDataSize = (8 * 8 * (32 + 32) + 16 * 16 * 4) blocks = 5120 blocks parallelism = 8 * 8 * 4 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm. **The two cases above all have parallism = 256**, case 1 `numMidDimSplits = 1` is equivalent with current implementation in mllib, but case 2 shuffling data is 60.6% of case 1, **it shows that under the same parallelism, proper `numMidDimSplits` will significantly reduce the shuffling data size**. ## How was this patch tested? Test suites added. Running result: ![blockmatrix](https://cloud.githubusercontent.com/assets/19235986/21600989/5e162cc2-d1bf-11e6-868c-0ec29190b605.png) Author: WeichenXu <WeichenXu123@outlook.com> Closes #15730 from WeichenXu123/optim_block_matrix.
Diffstat (limited to 'mllib/src/test/scala/org/apache')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala22
1 files changed, 16 insertions, 6 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index 61266f3c78..f6a9969402 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -267,6 +267,15 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(sparseBM.subtract(sparseBM).toBreeze() === sparseBM.subtract(denseBM).toBreeze())
}
+ def testMultiply(A: BlockMatrix, B: BlockMatrix, expectedResult: Matrix,
+ numMidDimSplits: Int): Unit = {
+ val C = A.multiply(B, numMidDimSplits)
+ val localC = C.toLocalMatrix()
+ assert(C.numRows() === A.numRows())
+ assert(C.numCols() === B.numCols())
+ assert(localC ~== expectedResult absTol 1e-8)
+ }
+
test("multiply") {
// identity matrix
val blocks: Seq[((Int, Int), Matrix)] = Seq(
@@ -302,12 +311,13 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
// Try it with increased number of partitions
val largeA = new BlockMatrix(sc.parallelize(largerAblocks, 10), 6, 4)
val largeB = new BlockMatrix(sc.parallelize(largerBblocks, 8), 4, 4)
- val largeC = largeA.multiply(largeB)
- val localC = largeC.toLocalMatrix()
+
val result = largeA.toLocalMatrix().multiply(largeB.toLocalMatrix().asInstanceOf[DenseMatrix])
- assert(largeC.numRows() === largeA.numRows())
- assert(largeC.numCols() === largeB.numCols())
- assert(localC ~== result absTol 1e-8)
+
+ testMultiply(largeA, largeB, result, 1)
+ testMultiply(largeA, largeB, result, 2)
+ testMultiply(largeA, largeB, result, 3)
+ testMultiply(largeA, largeB, result, 4)
}
test("simulate multiply") {
@@ -318,7 +328,7 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
val B = new BlockMatrix(rdd, colPerPart, rowPerPart)
val resultPartitioner = GridPartitioner(gridBasedMat.numRowBlocks, B.numColBlocks,
math.max(numPartitions, 2))
- val (destinationsA, destinationsB) = gridBasedMat.simulateMultiply(B, resultPartitioner)
+ val (destinationsA, destinationsB) = gridBasedMat.simulateMultiply(B, resultPartitioner, 1)
assert(destinationsA((0, 0)) === Set(0))
assert(destinationsA((0, 1)) === Set(2))
assert(destinationsA((1, 0)) === Set(0))