aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorZheng RuiFeng <ruifengz@foxmail.com>2016-02-20 12:24:10 -0800
committerReynold Xin <rxin@databricks.com>2016-02-20 12:24:10 -0800
commit6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc (patch)
tree6be925e5342c746ac2a92c65a794f76538971e39 /graphx
parent9ca79c1ece5ad139719e4eea9f7d1b59aed01b20 (diff)
downloadspark-6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc.tar.gz
spark-6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc.tar.bz2
spark-6ce7c481dc3a94af503d0f3f86e2be7ba82b3bbc.zip
[SPARK-13386][GRAPHX] ConnectedComponents should support maxIteration option
JIRA: https://issues.apache.org/jira/browse/SPARK-13386 ## What changes were proposed in this pull request? add maxIteration option for ConnectedComponents algorithm ## How was the this patch tested? unit tests passed Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #11268 from zhengruifeng/ccwithmax.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala24
2 files changed, 32 insertions, 6 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index d048fb5d56..97a82239a9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -406,13 +406,23 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
}
/**
+ * Compute the connected component membership of each vertex and return a graph with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
+ */
+ def connectedComponents(): Graph[VertexId, ED] = {
+ ConnectedComponents.run(graph)
+ }
+
+ /**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
*/
- def connectedComponents(): Graph[VertexId, ED] = {
- ConnectedComponents.run(graph)
+ def connectedComponents(maxIterations: Int): Graph[VertexId, ED] = {
+ ConnectedComponents.run(graph, maxIterations)
}
/**
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index f72cbb1524..40cf0735e2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -29,13 +29,14 @@ object ConnectedComponents {
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
- *
* @param graph the graph for which to compute the connected components
- *
+ * @param maxIterations the maximum number of iterations to run for
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
- def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
+ maxIterations: Int): Graph[VertexId, ED] = {
+ require(maxIterations > 0)
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
if (edge.srcAttr < edge.dstAttr) {
@@ -47,11 +48,26 @@ object ConnectedComponents {
}
}
val initialMessage = Long.MaxValue
- val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
+ val pregelGraph = Pregel(ccGraph, initialMessage,
+ maxIterations, EdgeDirection.Either)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
ccGraph.unpersist()
pregelGraph
} // end of connectedComponents
+
+ /**
+ * Compute the connected component membership of each vertex and return a graph with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @tparam VD the vertex attribute type (discarded in the computation)
+ * @tparam ED the edge attribute type (preserved in the computation)
+ * @param graph the graph for which to compute the connected components
+ * @return a graph with vertex attributes containing the smallest vertex in each
+ * connected component
+ */
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
+ run(graph, Int.MaxValue)
+ }
}