diff options
author | Alexander Ulanov <nashb@yandex.ru> | 2015-08-18 22:13:52 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-18 22:13:57 -0700 |
commit | 416392697f20de27b37db0cf0bad15a0e5ac9ebe (patch) | |
tree | 0445fe1b5498ee55e4ae2075070b9b2d9f0c7fe6 | |
parent | 03a8a889a98ab30e4d33dc1a415aa84253111ffa (diff) | |
download | spark-416392697f20de27b37db0cf0bad15a0e5ac9ebe.tar.gz spark-416392697f20de27b37db0cf0bad15a0e5ac9ebe.tar.bz2 spark-416392697f20de27b37db0cf0bad15a0e5ac9ebe.zip |
[SPARK-9508] GraphX Pregel docs update with new Pregel code
SPARK-9436 simplifies the Pregel code. graphx-programming-guide needs to be modified accordingly since it lists the old Pregel code
Author: Alexander Ulanov <nashb@yandex.ru>
Closes #7831 from avulanov/SPARK-9508-pregel-doc2.
(cherry picked from commit 1c843e284818004f16c3f1101c33b510f80722e3)
Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r-- | docs/graphx-programming-guide.md | 18 |
1 files changed, 8 insertions, 10 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 99f8c827f7..c861a763d6 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -768,16 +768,14 @@ class GraphOps[VD, ED] { // Loop until no messages remain or maxIterations is achieved var i = 0 while (activeMessages > 0 && i < maxIterations) { - // Receive the messages: ----------------------------------------------------------------------- - // Run the vertex program on all vertices that receive messages - val newVerts = g.vertices.innerJoin(messages)(vprog).cache() - // Merge the new vertex values back into the graph - g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() - // Send Messages: ------------------------------------------------------------------------------ - // Vertices that didn't receive a message above don't appear in newVerts and therefore don't - // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked - // on edges in the activeDir of vertices in newVerts - messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() + // Receive the messages and update the vertices. + g = g.joinVertices(messages)(vprog).cache() + val oldMessages = messages + // Send new messages, skipping edges where neither side received a message. We must cache + // messages so it can be materialized on the next line, allowing us to uncache the previous + // iteration. + messages = g.mapReduceTriplets( + sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() activeMessages = messages.count() i += 1 } |