diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-13 18:32:04 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-13 18:32:04 -0800 |
commit | 1dce9ce446dd248755cd65b7a6a0729a4dca2d62 (patch) | |
tree | 234ff88342d597de0f8af395b96f620ea06167d9 /graphx | |
parent | ae06d2c22ffb8af3c27c29bc55aadfb73b56e9ff (diff) | |
download | spark-1dce9ce446dd248755cd65b7a6a0729a4dca2d62.tar.gz spark-1dce9ce446dd248755cd65b7a6a0729a4dca2d62.tar.bz2 spark-1dce9ce446dd248755cd65b7a6a0729a4dca2d62.zip |
Moved PartitionStrategy's into an object.
Diffstat (limited to 'graphx')
4 files changed, 85 insertions, 81 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index b9ccd8765e..6d2990a3f6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -4,96 +4,100 @@ package org.apache.spark.graphx * Represents the way edges are assigned to edge partitions based on their source and destination * vertex IDs. */ -sealed trait PartitionStrategy extends Serializable { +trait PartitionStrategy extends Serializable { /** Returns the partition number for a given edge. */ def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID } - /** - * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, - * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication. - * - * Suppose we have a graph with 11 vertices that we want to partition - * over 9 machines. We can use the following sparse matrix representation: - * - * <pre> - * __________________________________ - * v0 | P0 * | P1 | P2 * | - * v1 | **** | * | | - * v2 | ******* | ** | **** | - * v3 | ***** | * * | * | - * ---------------------------------- - * v4 | P3 * | P4 *** | P5 ** * | - * v5 | * * | * | | - * v6 | * | ** | **** | - * v7 | * * * | * * | * | - * ---------------------------------- - * v8 | P6 * | P7 * | P8 * *| - * v9 | * | * * | | - * v10 | * | ** | * * | - * v11 | * <-E | *** | ** | - * ---------------------------------- - * </pre> - * - * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the - * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice - * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last - * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be - * replicated to at most `2 * sqrt(numParts)` machines. - * - * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work - * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the - * vertex locations. - * - * One of the limitations of this approach is that the number of machines must either be a perfect - * square. We partially address this limitation by computing the machine assignment to the next - * largest perfect square and then mapping back down to the actual number of machines. - * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square - * is used. + * Collection of built-in [[PartitionStrategy]] implementations. */ -case object EdgePartition2D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt - val mixingPrime: VertexID = 1125899906842597L - val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt - val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts +object PartitionStrategy { + /** + * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, + * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication. + * + * Suppose we have a graph with 11 vertices that we want to partition + * over 9 machines. We can use the following sparse matrix representation: + * + * <pre> + * __________________________________ + * v0 | P0 * | P1 | P2 * | + * v1 | **** | * | | + * v2 | ******* | ** | **** | + * v3 | ***** | * * | * | + * ---------------------------------- + * v4 | P3 * | P4 *** | P5 ** * | + * v5 | * * | * | | + * v6 | * | ** | **** | + * v7 | * * * | * * | * | + * ---------------------------------- + * v8 | P6 * | P7 * | P8 * *| + * v9 | * | * * | | + * v10 | * | ** | * * | + * v11 | * <-E | *** | ** | + * ---------------------------------- + * </pre> + * + * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the + * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice + * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last + * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be + * replicated to at most `2 * sqrt(numParts)` machines. + * + * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work + * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the + * vertex locations. + * + * One of the limitations of this approach is that the number of machines must either be a perfect + * square. We partially address this limitation by computing the machine assignment to the next + * largest perfect square and then mapping back down to the actual number of machines. + * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square + * is used. + */ + case object EdgePartition2D extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt + val mixingPrime: VertexID = 1125899906842597L + val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt + val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt + (col * ceilSqrtNumParts + row) % numParts + } } -} -/** - * Assigns edges to partitions using only the source vertex ID, colocating edges with the same - * source. - */ -case object EdgePartition1D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - val mixingPrime: VertexID = 1125899906842597L - (math.abs(src) * mixingPrime).toInt % numParts + /** + * Assigns edges to partitions using only the source vertex ID, colocating edges with the same + * source. + */ + case object EdgePartition1D extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val mixingPrime: VertexID = 1125899906842597L + (math.abs(src) * mixingPrime).toInt % numParts + } } -} -/** - * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a - * random vertex cut that colocates all same-direction edges between two vertices. - */ -case object RandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - math.abs((src, dst).hashCode()) % numParts + /** + * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a + * random vertex cut that colocates all same-direction edges between two vertices. + */ + case object RandomVertexCut extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + math.abs((src, dst).hashCode()) % numParts + } } -} -/** - * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical - * direction, resulting in a random vertex cut that colocates all edges between two vertices, - * regardless of direction. - */ -case object CanonicalRandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - val lower = math.min(src, dst) - val higher = math.max(src, dst) - math.abs((lower, higher).hashCode()) % numParts + /** + * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical + * direction, resulting in a random vertex cut that colocates all edges between two vertices, + * regardless of direction. + */ + case object CanonicalRandomVertexCut extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val lower = math.min(src, dst) + val higher = math.max(src, dst) + math.abs((lower, higher).hashCode()) % numParts + } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index d5e1de1ce0..e0aff5644e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -2,6 +2,7 @@ package org.apache.spark.graphx.lib import org.apache.spark._ import org.apache.spark.graphx._ +import org.apache.spark.graphx.PartitionStrategy._ /** * Driver program for running graph algorithms. @@ -20,6 +21,7 @@ object Analytics extends Logging { } def pickPartitioner(v: String): PartitionStrategy = { + // TODO: Use reflection rather than listing all the partitioning strategies here. v match { case "RandomVertexCut" => RandomVertexCut case "EdgePartition1D" => EdgePartition1D diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index c32a6cbb81..9587f04c3e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -4,6 +4,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ +import org.apache.spark.graphx.PartitionStrategy._ import org.apache.spark.rdd._ class GraphSuite extends FunSuite with LocalSparkContext { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala index a286b7d03b..3452ce9764 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala @@ -2,11 +2,8 @@ package org.apache.spark.graphx.lib import org.scalatest.FunSuite -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ import org.apache.spark.graphx._ -import org.apache.spark.graphx.util.GraphGenerators -import org.apache.spark.rdd._ +import org.apache.spark.graphx.PartitionStrategy.RandomVertexCut class TriangleCountSuite extends FunSuite with LocalSparkContext { |