aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
authorMichał Wesołowski <michal.wesolowski@bzwbk.pl>2016-07-19 12:18:42 +0100
committerSean Owen <sowen@cloudera.com>2016-07-19 12:18:42 +0100
commit5d92326be76cb15edc6e18e94a373e197f696803 (patch)
treef7d8bdccef69e22a825fd09b2b36ab816e5bd3f6 /graphx/src
parent6c4b9f4be6b429197c6a53f937a82c2ac5866d65 (diff)
downloadspark-5d92326be76cb15edc6e18e94a373e197f696803.tar.gz
spark-5d92326be76cb15edc6e18e94a373e197f696803.tar.bz2
spark-5d92326be76cb15edc6e18e94a373e197f696803.zip
[SPARK-16478] graphX (added graph caching in strongly connected components)
## What changes were proposed in this pull request? I added caching in every iteration for sccGraph that is returned in strongly connected components. Without this cache strongly connected components returned graph that needed to be computed from scratch when some intermediary caches didn't existed anymore. ## How was this patch tested? I tested it by running code similar to the one [on databrics](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4889410027417133/3634650767364730/3117184429335832/latest.html). Basically I generated large graph and computed strongly connected components with changed code, than simply run count on vertices and edges. Count after this update takes few seconds instead 20 minutes. # statement contribution is my original work and I license the work to the project under the project's open source license. Author: Michał Wesołowski <michal.wesolowski@bzwbk.pl> Closes #14137 from wesolowskim/SPARK-16478.
Diffstat (limited to 'graphx/src')
-rwxr-xr-x[-rw-r--r--]graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala86
1 files changed, 50 insertions, 36 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
index 1fa92b0195..e4f80ffcb4 100644..100755
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
@@ -44,6 +44,9 @@ object StronglyConnectedComponents {
// graph we are going to work with in our iterations
var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) }.cache()
+ // helper variables to unpersist cached graphs
+ var prevSccGraph = sccGraph
+
var numVertices = sccWorkGraph.numVertices
var iter = 0
while (sccWorkGraph.numVertices > 0 && iter < numIter) {
@@ -64,48 +67,59 @@ object StronglyConnectedComponents {
// write values to sccGraph
sccGraph = sccGraph.outerJoinVertices(finalVertices) {
(vid, scc, opt) => opt.getOrElse(scc)
- }
+ }.cache()
+ // materialize vertices and edges
+ sccGraph.vertices.count()
+ sccGraph.edges.count()
+ // sccGraph materialized so, unpersist can be done on previous
+ prevSccGraph.unpersist(blocking = false)
+ prevSccGraph = sccGraph
+
// only keep vertices that are not final
sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2).cache()
} while (sccWorkGraph.numVertices < numVertices)
- sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) }
+ // if iter < numIter at this point sccGraph that is returned
+ // will not be recomputed and pregel executions are pointless
+ if (iter < numIter) {
+ sccWorkGraph = sccWorkGraph.mapVertices { case (vid, (color, isFinal)) => (vid, isFinal) }
- // collect min of all my neighbor's scc values, update if it's smaller than mine
- // then notify any neighbors with scc values larger than mine
- sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
- sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
- (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
- e => {
- if (e.srcAttr._1 < e.dstAttr._1) {
- Iterator((e.dstId, e.srcAttr._1))
- } else {
- Iterator()
- }
- },
- (vid1, vid2) => math.min(vid1, vid2))
+ // collect min of all my neighbor's scc values, update if it's smaller than mine
+ // then notify any neighbors with scc values larger than mine
+ sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
+ sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
+ (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
+ e => {
+ if (e.srcAttr._1 < e.dstAttr._1) {
+ Iterator((e.dstId, e.srcAttr._1))
+ } else {
+ Iterator()
+ }
+ },
+ (vid1, vid2) => math.min(vid1, vid2))
- // start at root of SCCs. Traverse values in reverse, notify all my neighbors
- // do not propagate if colors do not match!
- sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
- sccWorkGraph, false, activeDirection = EdgeDirection.In)(
- // vertex is final if it is the root of a color
- // or it has the same color as a neighbor that is final
- (vid, myScc, existsSameColorFinalNeighbor) => {
- val isColorRoot = vid == myScc._1
- (myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor)
- },
- // activate neighbor if they are not final, you are, and you have the same color
- e => {
- val sameColor = e.dstAttr._1 == e.srcAttr._1
- val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2
- if (sameColor && onlyDstIsFinal) {
- Iterator((e.srcId, e.dstAttr._2))
- } else {
- Iterator()
- }
- },
- (final1, final2) => final1 || final2)
+ // start at root of SCCs. Traverse values in reverse, notify all my neighbors
+ // do not propagate if colors do not match!
+ sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
+ sccWorkGraph, false, activeDirection = EdgeDirection.In)(
+ // vertex is final if it is the root of a color
+ // or it has the same color as a neighbor that is final
+ (vid, myScc, existsSameColorFinalNeighbor) => {
+ val isColorRoot = vid == myScc._1
+ (myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor)
+ },
+ // activate neighbor if they are not final, you are, and you have the same color
+ e => {
+ val sameColor = e.dstAttr._1 == e.srcAttr._1
+ val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2
+ if (sameColor && onlyDstIsFinal) {
+ Iterator((e.srcId, e.dstAttr._2))
+ } else {
+ Iterator()
+ }
+ },
+ (final1, final2) => final1 || final2)
+ }
}
sccGraph
}