diff options
author | Zheng RuiFeng <ruifengz@foxmail.com> | 2016-02-20 12:24:10 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-20 12:24:10 -0800 |
commit | 6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc (patch) | |
tree | 6be925e5342c746ac2a92c65a794f76538971e39 /graphx/src | |
parent | 9ca79c1ece5ad139719e4eea9f7d1b59aed01b20 (diff) | |
download | spark-6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc.tar.gz spark-6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc.tar.bz2 spark-6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc.zip |
[SPARK-13386][GRAPHX] ConnectedComponents should support maxIteration option
JIRA: https://issues.apache.org/jira/browse/SPARK-13386
## What changes were proposed in this pull request?
add maxIteration option for ConnectedComponents algorithm
## How was the this patch tested?
unit tests passed
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes #11268 from zhengruifeng/ccwithmax.
Diffstat (limited to 'graphx/src')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 14 | ||||
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala | 24 |
2 files changed, 32 insertions, 6 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index d048fb5d56..97a82239a9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -406,13 +406,23 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] + */ + def connectedComponents(): Graph[VertexId, ED] = { + ConnectedComponents.run(graph) + } + + /** * Compute the connected component membership of each vertex and return a graph with the vertex * value containing the lowest vertex id in the connected component containing that vertex. * * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] */ - def connectedComponents(): Graph[VertexId, ED] = { - ConnectedComponents.run(graph) + def connectedComponents(maxIterations: Int): Graph[VertexId, ED] = { + ConnectedComponents.run(graph, maxIterations) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index f72cbb1524..40cf0735e2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -29,13 +29,14 @@ object ConnectedComponents { * * @tparam VD the vertex attribute type (discarded in the computation) * @tparam ED the edge attribute type (preserved in the computation) - * * @param graph the graph for which to compute the connected components - * + * @param maxIterations the maximum number of iterations to run for * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], + maxIterations: Int): Graph[VertexId, ED] = { + require(maxIterations > 0) val ccGraph = graph.mapVertices { case (vid, _) => vid } def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = { if (edge.srcAttr < edge.dstAttr) { @@ -47,11 +48,26 @@ object ConnectedComponents { } } val initialMessage = Long.MaxValue - val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( + val pregelGraph = Pregel(ccGraph, initialMessage, + maxIterations, EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) ccGraph.unpersist() pregelGraph } // end of connectedComponents + + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * @param graph the graph for which to compute the connected components + * @return a graph with vertex attributes containing the smallest vertex in each + * connected component + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { + run(graph, Int.MaxValue) + } } |