From 4d3bba3a1336bb2762e050ed515de0fac3add252 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sat, 14 Dec 2013 03:22:03 -0800 Subject: Add debug logging to Pregel --- graph/src/main/scala/org/apache/spark/graph/Pregel.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 285e857b69..31ca7ff09a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -92,22 +92,29 @@ object Pregel { : Graph[VD, ED] = { var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ) + println("[pre] g: " + g.vertices.cache().collect.mkString(",")) // compute the messages var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache() + println("[pre] messages: " + messages.collect.mkString(",")) var activeMessages = messages.count() + println("Pregel pre-run, %d active messages".format(activeMessages)) // Loop var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. val newVerts = g.vertices.innerJoin(messages)(vprog).cache() + println("newVerts: " + newVerts.collect.mkString(",")) // Update the graph with the new vertices. g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) } + println("g: " + g.vertices.cache().collect.mkString(",")) val oldMessages = messages // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't // get to send messages. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache() + println("messages: " + messages.collect.mkString(",")) activeMessages = messages.count() + println("Pregel iter %d, %d active messages".format(i, activeMessages)) // after counting we can unpersist the old messages oldMessages.unpersist(blocking=false) // count the iteration -- cgit v1.2.3