diff options
author | Andrew Ray <ray.andrew@gmail.com> | 2015-07-14 13:14:47 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2015-07-14 13:14:47 -0700 |
commit | 0a4071eab30db1db80f61ed2cb2e7243291183ce (patch) | |
tree | 1c321675f2113e13b7529e557df575a678c59f14 /graphx/src | |
parent | d267c2834a639aaebd0559355c6a82613abb689b (diff) | |
download | spark-0a4071eab30db1db80f61ed2cb2e7243291183ce.tar.gz spark-0a4071eab30db1db80f61ed2cb2e7243291183ce.tar.bz2 spark-0a4071eab30db1db80f61ed2cb2e7243291183ce.zip |
[SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions
See https://github.com/aray/e2d/blob/master/EdgePartition2D.ipynb
Author: Andrew Ray <ray.andrew@gmail.com>
Closes #7104 from aray/edge-partition-2d-improvement and squashes the following commits:
3729f84 [Andrew Ray] correct bounds and remove unneeded comments
97f8464 [Andrew Ray] change less
5141ab4 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement
925fd2c [Andrew Ray] use new interface for partitioning
001bfd0 [Andrew Ray] Refactor PartitionStrategy so that we can return a prtition function for a given number of parts. To keep compatibility we define default methods that translate between the two implementation options. Made EdgePartition2D use old strategy when we have a perfect square and implement new interface.
5d42105 [Andrew Ray] % -> /
3560084 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement
f006364 [Andrew Ray] remove unneeded comments
cfa2c5e [Andrew Ray] Modifications to EdgePartition2D so that it works for non perfect squares.
Diffstat (limited to 'graphx/src')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 7372dfbd9f..70a7592da8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -32,7 +32,7 @@ trait PartitionStrategy extends Serializable { object PartitionStrategy { /** * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, - * guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication. + * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication. * * Suppose we have a graph with 12 vertices that we want to partition * over 9 machines. We can use the following sparse matrix representation: @@ -61,26 +61,36 @@ object PartitionStrategy { * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, * P6)` or the last * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be - * replicated to at most `2 * sqrt(numParts) - 1` machines. + * replicated to at most `2 * sqrt(numParts)` machines. * * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the * vertex locations. * - * One of the limitations of this approach is that the number of machines must either be a - * perfect square. We partially address this limitation by computing the machine assignment to - * the next - * largest perfect square and then mapping back down to the actual number of machines. - * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect - * square is used. + * When the number of partitions requested is not a perfect square we use a slightly different + * method where the last column can have a different number of rows than the others while still + * maintaining the same size per block. */ case object EdgePartition2D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt val mixingPrime: VertexId = 1125899906842597L - val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt - val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts + if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) { + // Use old method for perfect squared to ensure we get same results + val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt + val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt + (col * ceilSqrtNumParts + row) % numParts + + } else { + // Otherwise use new method + val cols = ceilSqrtNumParts + val rows = (numParts + cols - 1) / cols + val lastColRows = numParts - rows * (cols - 1) + val col = (math.abs(src * mixingPrime) % numParts / rows).toInt + val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt + col * rows + row + + } } } |