aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-14 03:22:03 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-14 15:28:23 -0800
commit4d3bba3a1336bb2762e050ed515de0fac3add252 (patch)
tree85b5436d5fcf9333a9f6348939f253beda3e5a15
parentcf6288b993033d087dfb243c590c9428d91004b8 (diff)
downloadspark-4d3bba3a1336bb2762e050ed515de0fac3add252.tar.gz
spark-4d3bba3a1336bb2762e050ed515de0fac3add252.tar.bz2
spark-4d3bba3a1336bb2762e050ed515de0fac3add252.zip
Add debug logging to Pregel
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala7
1 files changed, 7 insertions, 0 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 285e857b69..31ca7ff09a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -92,22 +92,29 @@ object Pregel {
: Graph[VD, ED] = {
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
+ println("[pre] g: " + g.vertices.cache().collect.mkString(","))
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
+ println("[pre] messages: " + messages.collect.mkString(","))
var activeMessages = messages.count()
+ println("Pregel pre-run, %d active messages".format(activeMessages))
// Loop
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+ println("newVerts: " + newVerts.collect.mkString(","))
// Update the graph with the new vertices.
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+ println("g: " + g.vertices.cache().collect.mkString(","))
val oldMessages = messages
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
// get to send messages.
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
+ println("messages: " + messages.collect.mkString(","))
activeMessages = messages.count()
+ println("Pregel iter %d, %d active messages".format(i, activeMessages))
// after counting we can unpersist the old messages
oldMessages.unpersist(blocking=false)
// count the iteration