aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-13 14:59:30 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-13 14:59:30 -0800
commit8038da232870fe016e73122a2ef110ac8e56ca1e (patch)
tree16466a9ab2b468fb12432ea8e855efd894f420cc /graphx
parent97cd27e31b18f4c41ef556aee2ab65350694f8b8 (diff)
parent80e4d98dc656e20dacbd8cdbf92d4912673b42ae (diff)
downloadspark-8038da232870fe016e73122a2ef110ac8e56ca1e.tar.gz
spark-8038da232870fe016e73122a2ef110ac8e56ca1e.tar.bz2
spark-8038da232870fe016e73122a2ef110ac8e56ca1e.zip
Merge pull request #2 from jegonzal/GraphXCCIssue
Improving documentation and identifying potential bug in CC calculation.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala44
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala30
3 files changed, 62 insertions, 16 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 2b3b95e2ca..a0a40e2d9a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -325,8 +325,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents]]
*/
- def connectedComponents(): Graph[VertexID, ED] = {
- ConnectedComponents.run(graph)
+ def connectedComponents(undirected: Boolean = true): Graph[VertexID, ED] = {
+ ConnectedComponents.run(graph, undirected)
}
/**
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 4a83e2dbb8..d078d2acdb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -14,26 +14,42 @@ object ConnectedComponents {
* @tparam ED the edge attribute type (preserved in the computation)
*
* @param graph the graph for which to compute the connected components
+ * @param undirected compute reachability ignoring edge direction.
*
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
- def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], undirected: Boolean = true):
+ Graph[VertexID, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
-
- def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
- if (edge.srcAttr < edge.dstAttr) {
- Iterator((edge.dstId, edge.srcAttr))
- } else if (edge.srcAttr > edge.dstAttr) {
- Iterator((edge.srcId, edge.dstAttr))
- } else {
- Iterator.empty
+ if (undirected) {
+ def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
+ if (edge.srcAttr < edge.dstAttr) {
+ Iterator((edge.dstId, edge.srcAttr))
+ } else if (edge.srcAttr > edge.dstAttr) {
+ Iterator((edge.srcId, edge.dstAttr))
+ } else {
+ Iterator.empty
+ }
+ }
+ val initialMessage = Long.MaxValue
+ Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Both)(
+ vprog = (id, attr, msg) => math.min(attr, msg),
+ sendMsg = sendMessage,
+ mergeMsg = (a, b) => math.min(a, b))
+ } else {
+ def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
+ if (edge.srcAttr < edge.dstAttr) {
+ Iterator((edge.dstId, edge.srcAttr))
+ } else {
+ Iterator.empty
+ }
}
+ val initialMessage = Long.MaxValue
+ Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Out)(
+ vprog = (id, attr, msg) => math.min(attr, msg),
+ sendMsg = sendMessage,
+ mergeMsg = (a, b) => math.min(a, b))
}
- val initialMessage = Long.MaxValue
- Pregel(ccGraph, initialMessage)(
- vprog = (id, attr, msg) => math.min(attr, msg),
- sendMsg = sendMessage,
- mergeMsg = (a, b) => math.min(a, b))
} // end of connectedComponents
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
index 66612b381f..86da8f1b46 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
@@ -80,4 +80,34 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
}
} // end of reverse chain connected components
+ test("Connected Components on a Toy Connected Graph") {
+ withSpark { sc =>
+ // Create an RDD for the vertices
+ val users: RDD[(VertexID, (String, String))] =
+ sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
+ (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
+ (4L, ("peter", "student"))))
+ // Create an RDD for edges
+ val relationships: RDD[Edge[String]] =
+ sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
+ Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
+ Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
+ // Edges are:
+ // 2 ---> 5 ---> 3
+ // | \
+ // V \|
+ // 4 ---> 0 7
+ //
+ // Define a default user in case there are relationship with missing user
+ val defaultUser = ("John Doe", "Missing")
+ // Build the initial Graph
+ val graph = Graph(users, relationships, defaultUser)
+ val ccGraph = graph.connectedComponents(undirected = true)
+ val vertices = ccGraph.vertices.collect
+ for ( (id, cc) <- vertices ) {
+ assert(cc == 0)
+ }
+ }
+ } // end of toy connected components
+
}