diff options
author | Akihiro Matsukawa <amatsukawa@twitter.com> | 2013-12-18 18:22:43 -0800 |
---|---|---|
committer | Akihiro Matsukawa <amatsukawa@twitter.com> | 2013-12-18 18:22:43 -0800 |
commit | b1db1b6c84cfaa0c45cd99386c4500dc6ccf0bf6 (patch) | |
tree | 65ece22e35f58b35fc5fb02ec53a7b8813962a26 /graph | |
parent | a645ef633d4244d4e954e31129cae16884158176 (diff) | |
download | spark-b1db1b6c84cfaa0c45cd99386c4500dc6ccf0bf6.tar.gz spark-b1db1b6c84cfaa0c45cd99386c4500dc6ccf0bf6.tar.bz2 spark-b1db1b6c84cfaa0c45cd99386c4500dc6ccf0bf6.zip |
scc algorithm
Diffstat (limited to 'graph')
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala new file mode 100644 index 0000000000..c324c984d7 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala @@ -0,0 +1,87 @@ +package org.apache.spark.graph.algorithms + +import org.apache.spark.graph._ + +object StronglyConnectedComponents { + + /** + * Compute the strongly connected component (SCC) of each vertex and return an RDD with the vertex + * value containing the lowest vertex id in the SCC containing that vertex. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * + * @param graph the graph for which to compute the SCC + * + * @return a graph with vertex attributes containing the smallest vertex id in each SCC + */ + def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int): Graph[Vid, ED] = { + + // the graph we update with final SCC ids, and the graph we return at the end + var sccGraph = graph.mapVertices { case (vid, _) => vid } + // graph we are going to work with in our iterations + var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) } + + var numVertices = sccWorkGraph.numVertices + var iter = 0 + while (sccWorkGraph.numVertices > 0 && iter < numIter) { + iter += 1 + do { + numVertices = sccWorkGraph.numVertices + sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.outDegrees) { + (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true) + } + sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.inDegrees) { + (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true) + } + + // get all vertices to be removed + val finalVertices = sccWorkGraph.vertices + .filter { case (vid, (scc, isFinal)) => isFinal} + .mapValues { (vid, data) => data._1} + + // write values to sccGraph + sccGraph = sccGraph.outerJoinVertices(finalVertices) { + (vid, scc, opt) => opt.getOrElse(scc) + } + // only keep vertices that are not final + sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2) + } while (sccWorkGraph.numVertices < numVertices) + + sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) } + + // collect min of all my neighbor's scc values, update if it's smaller than mine + // then notify any neighbors with scc values larger than mine + sccWorkGraph = GraphLab[(Vid, Boolean), ED, Vid](sccWorkGraph, Integer.MAX_VALUE)( + (vid, e) => e.otherVertexAttr(vid)._1, + (vid1, vid2) => math.min(vid1, vid2), + (vid, scc, optScc) => + (math.min(scc._1, optScc.getOrElse(scc._1)), scc._2), + (vid, e) => e.vertexAttr(vid)._1 < e.otherVertexAttr(vid)._1 + ) + + // start at root of SCCs. Traverse values in reverse, notify all my neighbors + // do not propagate if colors do not match! + sccWorkGraph = GraphLab[(Vid, Boolean), ED, Boolean]( + sccWorkGraph, + Integer.MAX_VALUE, + EdgeDirection.Out, + EdgeDirection.In + )( + // vertex is final if it is the root of a color + // or it has the same color as a neighbor that is final + (vid, e) => (vid == e.vertexAttr(vid)._1) || (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), + (final1, final2) => final1 || final2, + (vid, scc, optFinal) => + (scc._1, scc._2 || optFinal.getOrElse(false)), + // activate neighbor if they are not final, you are, and you have the same color + (vid, e) => e.vertexAttr(vid)._2 && + !e.otherVertexAttr(vid)._2 && (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), + // start at root of colors + (vid, data) => vid == data._1 + ) + } + sccGraph + } + +} |