diff options
author | Alexander Ulanov <nashb@yandex.ru> | 2015-07-29 13:59:00 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2015-07-29 13:59:00 -0700 |
commit | b715933fc69a49653abdb2fba0818dfc4f35d358 (patch) | |
tree | 91ef137dad1860050c9dfb50135da41b488814ec /graphx/src/main/scala/org | |
parent | 5340dfaf94a3c54199f8cc3c78e11f61e34d0a67 (diff) | |
download | spark-b715933fc69a49653abdb2fba0818dfc4f35d358.tar.gz spark-b715933fc69a49653abdb2fba0818dfc4f35d358.tar.bz2 spark-b715933fc69a49653abdb2fba0818dfc4f35d358.zip |
[SPARK-9436] [GRAPHX] Pregel simplification patch
Pregel code contains two consecutive joins:
```
g.vertices.innerJoin(messages)(vprog)
...
g = g.outerJoinVertices(newVerts)
{ (vid, old, newOpt) => newOpt.getOrElse(old) }
```
This can be simplified with one join. ankurdave proposed a patch based on our discussion in the mailing list: https://www.mail-archive.com/devspark.apache.org/msg10316.html
Author: Alexander Ulanov <nashb@yandex.ru>
Closes #7749 from avulanov/SPARK-9436-pregel and squashes the following commits:
8568e06 [Alexander Ulanov] Pregel simplification patch
Diffstat (limited to 'graphx/src/main/scala/org')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala | 23 |
1 files changed, 10 insertions, 13 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index cfcf7244ea..2ca60d51f8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -127,28 +127,25 @@ object Pregel extends Logging { var prevG: Graph[VD, ED] = null var i = 0 while (activeMessages > 0 && i < maxIterations) { - // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. - val newVerts = g.vertices.innerJoin(messages)(vprog).cache() - // Update the graph with the new vertices. + // Receive the messages and update the vertices. prevG = g - g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) } - g.cache() + g = g.joinVertices(messages)(vprog).cache() val oldMessages = messages - // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't - // get to send messages. We must cache messages so it can be materialized on the next line, - // allowing us to uncache the previous iteration. - messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache() - // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This - // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the - // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). + // Send new messages, skipping edges where neither side received a message. We must cache + // messages so it can be materialized on the next line, allowing us to uncache the previous + // iteration. + messages = g.mapReduceTriplets( + sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() + // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages + // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages + // and the vertices of g). activeMessages = messages.count() logInfo("Pregel finished iteration " + i) // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking = false) - newVerts.unpersist(blocking = false) prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) // count the iteration |