aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-13 18:32:04 -0800
committerReynold Xin <rxin@apache.org>2014-01-13 18:32:04 -0800
commit1dce9ce446dd248755cd65b7a6a0729a4dca2d62 (patch)
tree234ff88342d597de0f8af395b96f620ea06167d9 /graphx
parentae06d2c22ffb8af3c27c29bc55aadfb73b56e9ff (diff)
downloadspark-1dce9ce446dd248755cd65b7a6a0729a4dca2d62.tar.gz
spark-1dce9ce446dd248755cd65b7a6a0729a4dca2d62.tar.bz2
spark-1dce9ce446dd248755cd65b7a6a0729a4dca2d62.zip
Moved PartitionStrategy's into an object.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala158
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala1
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala5
4 files changed, 85 insertions, 81 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index b9ccd8765e..6d2990a3f6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -4,96 +4,100 @@ package org.apache.spark.graphx
* Represents the way edges are assigned to edge partitions based on their source and destination
* vertex IDs.
*/
-sealed trait PartitionStrategy extends Serializable {
+trait PartitionStrategy extends Serializable {
/** Returns the partition number for a given edge. */
def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
}
-
/**
- * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
- * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
- *
- * Suppose we have a graph with 11 vertices that we want to partition
- * over 9 machines. We can use the following sparse matrix representation:
- *
- * <pre>
- * __________________________________
- * v0 | P0 * | P1 | P2 * |
- * v1 | **** | * | |
- * v2 | ******* | ** | **** |
- * v3 | ***** | * * | * |
- * ----------------------------------
- * v4 | P3 * | P4 *** | P5 ** * |
- * v5 | * * | * | |
- * v6 | * | ** | **** |
- * v7 | * * * | * * | * |
- * ----------------------------------
- * v8 | P6 * | P7 * | P8 * *|
- * v9 | * | * * | |
- * v10 | * | ** | * * |
- * v11 | * <-E | *** | ** |
- * ----------------------------------
- * </pre>
- *
- * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
- * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
- * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
- * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
- * replicated to at most `2 * sqrt(numParts)` machines.
- *
- * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
- * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
- * vertex locations.
- *
- * One of the limitations of this approach is that the number of machines must either be a perfect
- * square. We partially address this limitation by computing the machine assignment to the next
- * largest perfect square and then mapping back down to the actual number of machines.
- * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
- * is used.
+ * Collection of built-in [[PartitionStrategy]] implementations.
*/
-case object EdgePartition2D extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
- val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
- val mixingPrime: VertexID = 1125899906842597L
- val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
- val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
- (col * ceilSqrtNumParts + row) % numParts
+object PartitionStrategy {
+ /**
+ * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
+ * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
+ *
+ * Suppose we have a graph with 11 vertices that we want to partition
+ * over 9 machines. We can use the following sparse matrix representation:
+ *
+ * <pre>
+ * __________________________________
+ * v0 | P0 * | P1 | P2 * |
+ * v1 | **** | * | |
+ * v2 | ******* | ** | **** |
+ * v3 | ***** | * * | * |
+ * ----------------------------------
+ * v4 | P3 * | P4 *** | P5 ** * |
+ * v5 | * * | * | |
+ * v6 | * | ** | **** |
+ * v7 | * * * | * * | * |
+ * ----------------------------------
+ * v8 | P6 * | P7 * | P8 * *|
+ * v9 | * | * * | |
+ * v10 | * | ** | * * |
+ * v11 | * <-E | *** | ** |
+ * ----------------------------------
+ * </pre>
+ *
+ * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
+ * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
+ * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
+ * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
+ * replicated to at most `2 * sqrt(numParts)` machines.
+ *
+ * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
+ * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
+ * vertex locations.
+ *
+ * One of the limitations of this approach is that the number of machines must either be a perfect
+ * square. We partially address this limitation by computing the machine assignment to the next
+ * largest perfect square and then mapping back down to the actual number of machines.
+ * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
+ * is used.
+ */
+ case object EdgePartition2D extends PartitionStrategy {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
+ val mixingPrime: VertexID = 1125899906842597L
+ val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+ val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+ (col * ceilSqrtNumParts + row) % numParts
+ }
}
-}
-/**
- * Assigns edges to partitions using only the source vertex ID, colocating edges with the same
- * source.
- */
-case object EdgePartition1D extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
- val mixingPrime: VertexID = 1125899906842597L
- (math.abs(src) * mixingPrime).toInt % numParts
+ /**
+ * Assigns edges to partitions using only the source vertex ID, colocating edges with the same
+ * source.
+ */
+ case object EdgePartition1D extends PartitionStrategy {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val mixingPrime: VertexID = 1125899906842597L
+ (math.abs(src) * mixingPrime).toInt % numParts
+ }
}
-}
-/**
- * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
- * random vertex cut that colocates all same-direction edges between two vertices.
- */
-case object RandomVertexCut extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
- math.abs((src, dst).hashCode()) % numParts
+ /**
+ * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
+ * random vertex cut that colocates all same-direction edges between two vertices.
+ */
+ case object RandomVertexCut extends PartitionStrategy {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ math.abs((src, dst).hashCode()) % numParts
+ }
}
-}
-/**
- * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical
- * direction, resulting in a random vertex cut that colocates all edges between two vertices,
- * regardless of direction.
- */
-case object CanonicalRandomVertexCut extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
- val lower = math.min(src, dst)
- val higher = math.max(src, dst)
- math.abs((lower, higher).hashCode()) % numParts
+ /**
+ * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical
+ * direction, resulting in a random vertex cut that colocates all edges between two vertices,
+ * regardless of direction.
+ */
+ case object CanonicalRandomVertexCut extends PartitionStrategy {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val lower = math.min(src, dst)
+ val higher = math.max(src, dst)
+ math.abs((lower, higher).hashCode()) % numParts
+ }
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index d5e1de1ce0..e0aff5644e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -2,6 +2,7 @@ package org.apache.spark.graphx.lib
import org.apache.spark._
import org.apache.spark.graphx._
+import org.apache.spark.graphx.PartitionStrategy._
/**
* Driver program for running graph algorithms.
@@ -20,6 +21,7 @@ object Analytics extends Logging {
}
def pickPartitioner(v: String): PartitionStrategy = {
+ // TODO: Use reflection rather than listing all the partitioning strategies here.
v match {
case "RandomVertexCut" => RandomVertexCut
case "EdgePartition1D" => EdgePartition1D
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index c32a6cbb81..9587f04c3e 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -4,6 +4,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
+import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
index a286b7d03b..3452ce9764 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
@@ -2,11 +2,8 @@ package org.apache.spark.graphx.lib
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.GraphGenerators
-import org.apache.spark.rdd._
+import org.apache.spark.graphx.PartitionStrategy.RandomVertexCut
class TriangleCountSuite extends FunSuite with LocalSparkContext {