From 1dce9ce446dd248755cd65b7a6a0729a4dca2d62 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 13 Jan 2014 18:32:04 -0800 Subject: Moved PartitionStrategy's into an object. --- .../apache/spark/graphx/PartitionStrategy.scala | 158 +++++++++++---------- .../org/apache/spark/graphx/lib/Analytics.scala | 2 + .../scala/org/apache/spark/graphx/GraphSuite.scala | 1 + .../spark/graphx/lib/TriangleCountSuite.scala | 5 +- 4 files changed, 85 insertions(+), 81 deletions(-) (limited to 'graphx/src') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index b9ccd8765e..6d2990a3f6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -4,96 +4,100 @@ package org.apache.spark.graphx * Represents the way edges are assigned to edge partitions based on their source and destination * vertex IDs. */ -sealed trait PartitionStrategy extends Serializable { +trait PartitionStrategy extends Serializable { /** Returns the partition number for a given edge. */ def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID } - /** - * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, - * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication. - * - * Suppose we have a graph with 11 vertices that we want to partition - * over 9 machines. We can use the following sparse matrix representation: - * - *
- *       __________________________________
- *  v0   | P0 *     | P1       | P2    *  |
- *  v1   |  ****    |  *       |          |
- *  v2   |  ******* |      **  |  ****    |
- *  v3   |  *****   |  *  *    |       *  |
- *       ----------------------------------
- *  v4   | P3 *     | P4 ***   | P5 **  * |
- *  v5   |  *  *    |  *       |          |
- *  v6   |       *  |      **  |  ****    |
- *  v7   |  * * *   |  *  *    |       *  |
- *       ----------------------------------
- *  v8   | P6   *   | P7    *  | P8  *   *|
- *  v9   |     *    |  *    *  |          |
- *  v10  |       *  |      **  |  *  *    |
- *  v11  | * <-E    |  ***     |       ** |
- *       ----------------------------------
- * 
- * - * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the - * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice - * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last - * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be - * replicated to at most `2 * sqrt(numParts)` machines. - * - * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work - * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the - * vertex locations. - * - * One of the limitations of this approach is that the number of machines must either be a perfect - * square. We partially address this limitation by computing the machine assignment to the next - * largest perfect square and then mapping back down to the actual number of machines. - * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square - * is used. + * Collection of built-in [[PartitionStrategy]] implementations. */ -case object EdgePartition2D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt - val mixingPrime: VertexID = 1125899906842597L - val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt - val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts +object PartitionStrategy { + /** + * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, + * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication. + * + * Suppose we have a graph with 11 vertices that we want to partition + * over 9 machines. We can use the following sparse matrix representation: + * + *
+   *       __________________________________
+   *  v0   | P0 *     | P1       | P2    *  |
+   *  v1   |  ****    |  *       |          |
+   *  v2   |  ******* |      **  |  ****    |
+   *  v3   |  *****   |  *  *    |       *  |
+   *       ----------------------------------
+   *  v4   | P3 *     | P4 ***   | P5 **  * |
+   *  v5   |  *  *    |  *       |          |
+   *  v6   |       *  |      **  |  ****    |
+   *  v7   |  * * *   |  *  *    |       *  |
+   *       ----------------------------------
+   *  v8   | P6   *   | P7    *  | P8  *   *|
+   *  v9   |     *    |  *    *  |          |
+   *  v10  |       *  |      **  |  *  *    |
+   *  v11  | * <-E    |  ***     |       ** |
+   *       ----------------------------------
+   * 
+ * + * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the + * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice + * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last + * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be + * replicated to at most `2 * sqrt(numParts)` machines. + * + * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work + * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the + * vertex locations. + * + * One of the limitations of this approach is that the number of machines must either be a perfect + * square. We partially address this limitation by computing the machine assignment to the next + * largest perfect square and then mapping back down to the actual number of machines. + * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square + * is used. + */ + case object EdgePartition2D extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt + val mixingPrime: VertexID = 1125899906842597L + val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt + val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt + (col * ceilSqrtNumParts + row) % numParts + } } -} -/** - * Assigns edges to partitions using only the source vertex ID, colocating edges with the same - * source. - */ -case object EdgePartition1D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - val mixingPrime: VertexID = 1125899906842597L - (math.abs(src) * mixingPrime).toInt % numParts + /** + * Assigns edges to partitions using only the source vertex ID, colocating edges with the same + * source. + */ + case object EdgePartition1D extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val mixingPrime: VertexID = 1125899906842597L + (math.abs(src) * mixingPrime).toInt % numParts + } } -} -/** - * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a - * random vertex cut that colocates all same-direction edges between two vertices. - */ -case object RandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - math.abs((src, dst).hashCode()) % numParts + /** + * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a + * random vertex cut that colocates all same-direction edges between two vertices. + */ + case object RandomVertexCut extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + math.abs((src, dst).hashCode()) % numParts + } } -} -/** - * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical - * direction, resulting in a random vertex cut that colocates all edges between two vertices, - * regardless of direction. - */ -case object CanonicalRandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - val lower = math.min(src, dst) - val higher = math.max(src, dst) - math.abs((lower, higher).hashCode()) % numParts + /** + * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical + * direction, resulting in a random vertex cut that colocates all edges between two vertices, + * regardless of direction. + */ + case object CanonicalRandomVertexCut extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val lower = math.min(src, dst) + val higher = math.max(src, dst) + math.abs((lower, higher).hashCode()) % numParts + } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index d5e1de1ce0..e0aff5644e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -2,6 +2,7 @@ package org.apache.spark.graphx.lib import org.apache.spark._ import org.apache.spark.graphx._ +import org.apache.spark.graphx.PartitionStrategy._ /** * Driver program for running graph algorithms. @@ -20,6 +21,7 @@ object Analytics extends Logging { } def pickPartitioner(v: String): PartitionStrategy = { + // TODO: Use reflection rather than listing all the partitioning strategies here. v match { case "RandomVertexCut" => RandomVertexCut case "EdgePartition1D" => EdgePartition1D diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index c32a6cbb81..9587f04c3e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -4,6 +4,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ +import org.apache.spark.graphx.PartitionStrategy._ import org.apache.spark.rdd._ class GraphSuite extends FunSuite with LocalSparkContext { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala index a286b7d03b..3452ce9764 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala @@ -2,11 +2,8 @@ package org.apache.spark.graphx.lib import org.scalatest.FunSuite -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ import org.apache.spark.graphx._ -import org.apache.spark.graphx.util.GraphGenerators -import org.apache.spark.rdd._ +import org.apache.spark.graphx.PartitionStrategy.RandomVertexCut class TriangleCountSuite extends FunSuite with LocalSparkContext { -- cgit v1.2.3