From 5284ca78d17fb4de9a7019f3bbecf86484c13763 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 3 Jun 2014 20:49:14 -0700 Subject: Enable repartitioning of graph over different number of partitions It is currently very difficult to repartition a graph over a different number of partitions. This PR adds an additional `partitionBy` function that takes the number of partitions. Author: Joseph E. Gonzalez Closes #719 from jegonzal/graph_partitioning_options and squashes the following commits: 730b405 [Joseph E. Gonzalez] adding an additional number of partitions option to partitionBy --- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 10 ++++++++++ .../main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 8 +++++--- .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 6 +++++- 3 files changed, 20 insertions(+), 4 deletions(-) (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index c4f9d6514c..14ae50e665 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -106,9 +106,19 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab /** * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. */ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] + /** + * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. + * @param numPartitions the number of edge partitions in the new graph. + */ + def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] + /** * Transforms each vertex attribute in the graph using the map function. * diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index ef412cfd4e..5e7e72a764 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -114,9 +114,11 @@ object PartitionStrategy { */ case object CanonicalRandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { - val lower = math.min(src, dst) - val higher = math.max(src, dst) - math.abs((lower, higher).hashCode()) % numParts + if (src < dst) { + math.abs((src, dst).hashCode()) % numParts + } else { + math.abs((dst, src).hashCode()) % numParts + } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 59d9a8808e..15ea05cbe2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -74,7 +74,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { - val numPartitions = edges.partitions.size + partitionBy(partitionStrategy, edges.partitions.size) + } + + override def partitionBy( + partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = { val edTag = classTag[ED] val vdTag = classTag[VD] val newEdges = edges.withPartitionsRDD(edges.map { e => -- cgit v1.2.3