diff options
author | Alexander Ulanov <nashb@yandex.ru> | 2015-08-18 22:13:52 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-18 22:13:52 -0700 |
commit | 1c843e284818004f16c3f1101c33b510f80722e3 (patch) | |
tree | 5a977efd6e3fcf6d2ff3bfbaa766ab3dae0b8784 /docs | |
parent | de3223872a217c5224ba7136604f6b7753b29108 (diff) | |
download | spark-1c843e284818004f16c3f1101c33b510f80722e3.tar.gz spark-1c843e284818004f16c3f1101c33b510f80722e3.tar.bz2 spark-1c843e284818004f16c3f1101c33b510f80722e3.zip |
[SPARK-9508] GraphX Pregel docs update with new Pregel code
SPARK-9436 simplifies the Pregel code. graphx-programming-guide needs to be modified accordingly since it lists the old Pregel code
Author: Alexander Ulanov <nashb@yandex.ru>
Closes #7831 from avulanov/SPARK-9508-pregel-doc2.
Diffstat (limited to 'docs')
-rw-r--r-- | docs/graphx-programming-guide.md | 18 |
1 files changed, 8 insertions, 10 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 99f8c827f7..c861a763d6 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -768,16 +768,14 @@ class GraphOps[VD, ED] { // Loop until no messages remain or maxIterations is achieved var i = 0 while (activeMessages > 0 && i < maxIterations) { - // Receive the messages: ----------------------------------------------------------------------- - // Run the vertex program on all vertices that receive messages - val newVerts = g.vertices.innerJoin(messages)(vprog).cache() - // Merge the new vertex values back into the graph - g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() - // Send Messages: ------------------------------------------------------------------------------ - // Vertices that didn't receive a message above don't appear in newVerts and therefore don't - // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked - // on edges in the activeDir of vertices in newVerts - messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() + // Receive the messages and update the vertices. + g = g.joinVertices(messages)(vprog).cache() + val oldMessages = messages + // Send new messages, skipping edges where neither side received a message. We must cache + // messages so it can be materialized on the next line, allowing us to uncache the previous + // iteration. + messages = g.mapReduceTriplets( + sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() activeMessages = messages.count() i += 1 } |