aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-10 13:00:28 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-10 13:00:28 -0800
commit7bda99778566b48355bc17f59a2e941eda70ad85 (patch)
treefbbc129afa3b2080680aceb5c9f77cc835c1de6b
parenteb4b46f8d1f5c5593ec327ccd8f816f784cce6b1 (diff)
downloadspark-7bda99778566b48355bc17f59a2e941eda70ad85.tar.gz
spark-7bda99778566b48355bc17f59a2e941eda70ad85.tar.bz2
spark-7bda99778566b48355bc17f59a2e941eda70ad85.zip
Improve docs for PartitionStrategy
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala58
1 files changed, 31 insertions, 27 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 5e80a535f1..fc7635a033 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -1,16 +1,22 @@
package org.apache.spark.graphx
-
+/**
+ * Represents the way edges are assigned to edge partitions based on their source and destination
+ * vertex IDs.
+ */
sealed trait PartitionStrategy extends Serializable {
def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
}
/**
- * This function implements a classic 2D-Partitioning of a sparse matrix.
+ * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
+ * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
+ *
* Suppose we have a graph with 11 vertices that we want to partition
* over 9 machines. We can use the following sparse matrix representation:
*
+ * <pre>
* __________________________________
* v0 | P0 * | P1 | P2 * |
* v1 | **** | * | |
@@ -27,28 +33,23 @@ sealed trait PartitionStrategy extends Serializable {
* v10 | * | ** | * * |
* v11 | * <-E | *** | ** |
* ----------------------------------
+ * </pre>
*
- * The edge denoted by E connects v11 with v1 and is assigned to
- * processor P6. To get the processor number we divide the matrix
- * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges
- * adjacent to v11 can only be in the first colum of
- * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8).
- * As a consequence we can guarantee that v11 will need to be
- * replicated to at most 2 * sqrt(numProc) machines.
- *
- * Notice that P0 has many edges and as a consequence this
- * partitioning would lead to poor work balance. To improve
- * balance we first multiply each vertex id by a large prime
- * to effectively shuffle the vertex locations.
- *
- * One of the limitations of this approach is that the number of
- * machines must either be a perfect square. We partially address
- * this limitation by computing the machine assignment to the next
- * largest perfect square and then mapping back down to the actual
- * number of machines. Unfortunately, this can also lead to work
- * imbalance and so it is suggested that a perfect square is used.
+ * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
+ * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
+ * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
+ * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
+ * replicated to at most `2 * sqrt(numParts)` machines.
*
+ * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
+ * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
+ * vertex locations.
*
+ * One of the limitations of this approach is that the number of machines must either be a perfect
+ * square. We partially address this limitation by computing the machine assignment to the next
+ * largest perfect square and then mapping back down to the actual number of machines.
+ * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
+ * is used.
*/
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
@@ -60,7 +61,10 @@ case object EdgePartition2D extends PartitionStrategy {
}
}
-
+/**
+ * Assigns edges to partitions using only the source vertex ID, colocating edges with the same
+ * source.
+ */
case object EdgePartition1D extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val mixingPrime: VertexID = 1125899906842597L
@@ -70,8 +74,8 @@ case object EdgePartition1D extends PartitionStrategy {
/**
- * Assign edges to an aribtrary machine corresponding to a
- * random vertex cut.
+ * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
+ * random vertex cut that colocates all same-direction edges between two vertices.
*/
case object RandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
@@ -81,9 +85,9 @@ case object RandomVertexCut extends PartitionStrategy {
/**
- * Assign edges to an arbitrary machine corresponding to a random vertex cut. This
- * function ensures that edges of opposite direction between the same two vertices
- * will end up on the same partition.
+ * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical
+ * direction, resulting in a random vertex cut that colocates all edges between two vertices,
+ * regardless of direction.
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {