aboutsummaryrefslogtreecommitdiff
path: root/docs/graphx-programming-guide.md
diff options
context:
space:
mode:
authorAlexander Ulanov <nashb@yandex.ru>2015-08-18 22:13:52 -0700
committerReynold Xin <rxin@databricks.com>2015-08-18 22:13:52 -0700
commit1c843e284818004f16c3f1101c33b510f80722e3 (patch)
tree5a977efd6e3fcf6d2ff3bfbaa766ab3dae0b8784 /docs/graphx-programming-guide.md
parentde3223872a217c5224ba7136604f6b7753b29108 (diff)
downloadspark-1c843e284818004f16c3f1101c33b510f80722e3.tar.gz
spark-1c843e284818004f16c3f1101c33b510f80722e3.tar.bz2
spark-1c843e284818004f16c3f1101c33b510f80722e3.zip
[SPARK-9508] GraphX Pregel docs update with new Pregel code
SPARK-9436 simplifies the Pregel code. graphx-programming-guide needs to be modified accordingly since it lists the old Pregel code Author: Alexander Ulanov <nashb@yandex.ru> Closes #7831 from avulanov/SPARK-9508-pregel-doc2.
Diffstat (limited to 'docs/graphx-programming-guide.md')
-rw-r--r--docs/graphx-programming-guide.md18
1 files changed, 8 insertions, 10 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 99f8c827f7..c861a763d6 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -768,16 +768,14 @@ class GraphOps[VD, ED] {
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
- // Receive the messages: -----------------------------------------------------------------------
- // Run the vertex program on all vertices that receive messages
- val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
- // Merge the new vertex values back into the graph
- g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
- // Send Messages: ------------------------------------------------------------------------------
- // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
- // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
- // on edges in the activeDir of vertices in newVerts
- messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
+ // Receive the messages and update the vertices.
+ g = g.joinVertices(messages)(vprog).cache()
+ val oldMessages = messages
+ // Send new messages, skipping edges where neither side received a message. We must cache
+ // messages so it can be materialized on the next line, allowing us to uncache the previous
+ // iteration.
+ messages = g.mapReduceTriplets(
+ sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}