diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-13 14:59:30 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-13 14:59:30 -0800 |
commit | 8038da232870fe016e73122a2ef110ac8e56ca1e (patch) | |
tree | 16466a9ab2b468fb12432ea8e855efd894f420cc /graphx/src | |
parent | 97cd27e31b18f4c41ef556aee2ab65350694f8b8 (diff) | |
parent | 80e4d98dc656e20dacbd8cdbf92d4912673b42ae (diff) | |
download | spark-8038da232870fe016e73122a2ef110ac8e56ca1e.tar.gz spark-8038da232870fe016e73122a2ef110ac8e56ca1e.tar.bz2 spark-8038da232870fe016e73122a2ef110ac8e56ca1e.zip |
Merge pull request #2 from jegonzal/GraphXCCIssue
Improving documentation and identifying potential bug in CC calculation.
Diffstat (limited to 'graphx/src')
3 files changed, 62 insertions, 16 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 2b3b95e2ca..a0a40e2d9a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -325,8 +325,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * * @see [[org.apache.spark.graphx.lib.ConnectedComponents]] */ - def connectedComponents(): Graph[VertexID, ED] = { - ConnectedComponents.run(graph) + def connectedComponents(undirected: Boolean = true): Graph[VertexID, ED] = { + ConnectedComponents.run(graph, undirected) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 4a83e2dbb8..d078d2acdb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -14,26 +14,42 @@ object ConnectedComponents { * @tparam ED the edge attribute type (preserved in the computation) * * @param graph the graph for which to compute the connected components + * @param undirected compute reachability ignoring edge direction. * * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], undirected: Boolean = true): + Graph[VertexID, ED] = { val ccGraph = graph.mapVertices { case (vid, _) => vid } - - def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { - if (edge.srcAttr < edge.dstAttr) { - Iterator((edge.dstId, edge.srcAttr)) - } else if (edge.srcAttr > edge.dstAttr) { - Iterator((edge.srcId, edge.dstAttr)) - } else { - Iterator.empty + if (undirected) { + def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + if (edge.srcAttr < edge.dstAttr) { + Iterator((edge.dstId, edge.srcAttr)) + } else if (edge.srcAttr > edge.dstAttr) { + Iterator((edge.srcId, edge.dstAttr)) + } else { + Iterator.empty + } + } + val initialMessage = Long.MaxValue + Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Both)( + vprog = (id, attr, msg) => math.min(attr, msg), + sendMsg = sendMessage, + mergeMsg = (a, b) => math.min(a, b)) + } else { + def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + if (edge.srcAttr < edge.dstAttr) { + Iterator((edge.dstId, edge.srcAttr)) + } else { + Iterator.empty + } } + val initialMessage = Long.MaxValue + Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Out)( + vprog = (id, attr, msg) => math.min(attr, msg), + sendMsg = sendMessage, + mergeMsg = (a, b) => math.min(a, b)) } - val initialMessage = Long.MaxValue - Pregel(ccGraph, initialMessage)( - vprog = (id, attr, msg) => math.min(attr, msg), - sendMsg = sendMessage, - mergeMsg = (a, b) => math.min(a, b)) } // end of connectedComponents } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index 66612b381f..86da8f1b46 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -80,4 +80,34 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { } } // end of reverse chain connected components + test("Connected Components on a Toy Connected Graph") { + withSpark { sc => + // Create an RDD for the vertices + val users: RDD[(VertexID, (String, String))] = + sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), + (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), + (4L, ("peter", "student")))) + // Create an RDD for edges + val relationships: RDD[Edge[String]] = + sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), + Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) + // Edges are: + // 2 ---> 5 ---> 3 + // | \ + // V \| + // 4 ---> 0 7 + // + // Define a default user in case there are relationship with missing user + val defaultUser = ("John Doe", "Missing") + // Build the initial Graph + val graph = Graph(users, relationships, defaultUser) + val ccGraph = graph.connectedComponents(undirected = true) + val vertices = ccGraph.vertices.collect + for ( (id, cc) <- vertices ) { + assert(cc == 0) + } + } + } // end of toy connected components + } |