diff options
author | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2014-06-03 20:49:14 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-06-03 20:49:14 -0700 |
commit | 5284ca78d17fb4de9a7019f3bbecf86484c13763 (patch) | |
tree | 10622b49f26c49e705b930bba55a32e0a48a1e15 /graphx/src | |
parent | e8d93ee5284cb6a1d4551effe91ee8d233323329 (diff) | |
download | spark-5284ca78d17fb4de9a7019f3bbecf86484c13763.tar.gz spark-5284ca78d17fb4de9a7019f3bbecf86484c13763.tar.bz2 spark-5284ca78d17fb4de9a7019f3bbecf86484c13763.zip |
Enable repartitioning of graph over different number of partitions
It is currently very difficult to repartition a graph over a different number of partitions. This PR adds an additional `partitionBy` function that takes the number of partitions.
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Closes #719 from jegonzal/graph_partitioning_options and squashes the following commits:
730b405 [Joseph E. Gonzalez] adding an additional number of partitions option to partitionBy
Diffstat (limited to 'graphx/src')
3 files changed, 20 insertions, 4 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index c4f9d6514c..14ae50e665 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -106,10 +106,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab /** * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. */ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] /** + * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. + * @param numPartitions the number of edge partitions in the new graph. + */ + def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] + + /** * Transforms each vertex attribute in the graph using the map function. * * @note The new graph has the same structure. As a consequence the underlying index structures diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index ef412cfd4e..5e7e72a764 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -114,9 +114,11 @@ object PartitionStrategy { */ case object CanonicalRandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { - val lower = math.min(src, dst) - val higher = math.max(src, dst) - math.abs((lower, higher).hashCode()) % numParts + if (src < dst) { + math.abs((src, dst).hashCode()) % numParts + } else { + math.abs((dst, src).hashCode()) % numParts + } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 59d9a8808e..15ea05cbe2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -74,7 +74,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { - val numPartitions = edges.partitions.size + partitionBy(partitionStrategy, edges.partitions.size) + } + + override def partitionBy( + partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = { val edTag = classTag[ED] val vdTag = classTag[VD] val newEdges = edges.withPartitionsRDD(edges.map { e => |