aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala32
1 files changed, 21 insertions, 11 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 7372dfbd9f..70a7592da8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -32,7 +32,7 @@ trait PartitionStrategy extends Serializable {
object PartitionStrategy {
/**
* Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
- * guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication.
+ * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
*
* Suppose we have a graph with 12 vertices that we want to partition
* over 9 machines. We can use the following sparse matrix representation:
@@ -61,26 +61,36 @@ object PartitionStrategy {
* that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3,
* P6)` or the last
* row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
- * replicated to at most `2 * sqrt(numParts) - 1` machines.
+ * replicated to at most `2 * sqrt(numParts)` machines.
*
* Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
* balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
* vertex locations.
*
- * One of the limitations of this approach is that the number of machines must either be a
- * perfect square. We partially address this limitation by computing the machine assignment to
- * the next
- * largest perfect square and then mapping back down to the actual number of machines.
- * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect
- * square is used.
+ * When the number of partitions requested is not a perfect square we use a slightly different
+ * method where the last column can have a different number of rows than the others while still
+ * maintaining the same size per block.
*/
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: VertexId = 1125899906842597L
- val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
- val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
- (col * ceilSqrtNumParts + row) % numParts
+ if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
+ // Use old method for perfect squared to ensure we get same results
+ val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
+ val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
+ (col * ceilSqrtNumParts + row) % numParts
+
+ } else {
+ // Otherwise use new method
+ val cols = ceilSqrtNumParts
+ val rows = (numParts + cols - 1) / cols
+ val lastColRows = numParts - rows * (cols - 1)
+ val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
+ val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
+ col * rows + row
+
+ }
}
}