aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala23
1 files changed, 10 insertions, 13 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index cfcf7244ea..2ca60d51f8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -127,28 +127,25 @@ object Pregel extends Logging {
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
- // Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
- val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
- // Update the graph with the new vertices.
+ // Receive the messages and update the vertices.
prevG = g
- g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
- g.cache()
+ g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
- // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
- // get to send messages. We must cache messages so it can be materialized on the next line,
- // allowing us to uncache the previous iteration.
- messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache()
- // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This
- // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
- // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
+ // Send new messages, skipping edges where neither side received a message. We must cache
+ // messages so it can be materialized on the next line, allowing us to uncache the previous
+ // iteration.
+ messages = g.mapReduceTriplets(
+ sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
+ // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
+ // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
+ // and the vertices of g).
activeMessages = messages.count()
logInfo("Pregel finished iteration " + i)
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
- newVerts.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration