diff options
author | Ankur Dave <ankurdave@gmail.com> | 2011-04-12 23:38:58 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2011-05-03 15:38:01 -0700 |
commit | 62ef620354d2e996721395e2a7d0d039a7e9e2be (patch) | |
tree | d0bb579b6e8573dba8c6dacbad8119a591d3351c /bagel/src | |
parent | c0736f6f68e47b82e3634252f8dba4f709a33ba5 (diff) | |
download | spark-62ef620354d2e996721395e2a7d0d039a7e9e2be.tar.gz spark-62ef620354d2e996721395e2a7d0d039a7e9e2be.tar.bz2 spark-62ef620354d2e996721395e2a7d0d039a7e9e2be.zip |
Clean up Pregel.run, add logging
Diffstat (limited to 'bagel/src')
-rw-r--r-- | bagel/src/main/scala/bagel/Pregel.scala | 49 |
1 files changed, 23 insertions, 26 deletions
diff --git a/bagel/src/main/scala/bagel/Pregel.scala b/bagel/src/main/scala/bagel/Pregel.scala index 02eec40b2c..b0f7a48b7a 100644 --- a/bagel/src/main/scala/bagel/Pregel.scala +++ b/bagel/src/main/scala/bagel/Pregel.scala @@ -18,61 +18,58 @@ object Pregel extends Logging { * all vertices have voted to halt by setting their state to * Inactive. */ - def run[V <: Vertex : Manifest, M <: Message : Manifest, C](sc: SparkContext, verts: RDD[(String, V)], msgs: RDD[(String, M)], splits: Int, messageCombiner: (C, M) => C, defaultCombined: () => C, mergeCombined: (C, C) => C, superstep: Int = 0)(compute: (V, C, Int) => (V, Iterable[M])): RDD[V] = { - println("Starting superstep "+superstep+".") + def run[V <: Vertex : Manifest, M <: Message : Manifest, C](sc: SparkContext, verts: RDD[(String, V)], msgs: RDD[(String, M)], splits: Int, messageCombiner: (C, M) => C, defaultCombined: () => C, mergeCombined: (C, C) => C, maxSupersteps: Option[Int] = None, superstep: Int = 0)(compute: (V, C, Int) => (V, Iterable[M])): RDD[V] = { + logInfo("Starting superstep "+superstep+".") val startTime = System.currentTimeMillis // Bring together vertices and messages - println("Joining vertices and messages...") val combinedMsgs = msgs.combineByKey({x => messageCombiner(defaultCombined(), x)}, messageCombiner, mergeCombined, splits) - println("verts.splits.size = " + verts.splits.size) - println("combinedMsgs.splits.size = " + combinedMsgs.splits.size) - println("verts.partitioner = " + verts.partitioner) - println("combinedMsgs.partitioner = " + combinedMsgs.partitioner) + logDebug("verts.splits.size = " + verts.splits.size) + logDebug("combinedMsgs.splits.size = " + combinedMsgs.splits.size) + logDebug("verts.partitioner = " + verts.partitioner) + logDebug("combinedMsgs.partitioner = " + combinedMsgs.partitioner) + val joined = verts.groupWith(combinedMsgs) - println("joined.splits.size = " + joined.splits.size) - println("joined.partitioner = " + joined.partitioner) - //val joined = graph.groupByKeyAsymmetrical(messageCombiner, defaultCombined, mergeCombined, splits) - println("Done joining vertices and messages.") + logDebug("joined.splits.size = " + joined.splits.size) + logDebug("joined.partitioner = " + joined.partitioner) // Run compute on each vertex - println("Running compute on each vertex...") var messageCount = sc.accumulator(0) var activeVertexCount = sc.accumulator(0) val processed = joined.flatMapValues { case (Seq(), _) => None case (Seq(v), Seq(comb)) => val (newVertex, newMessages) = compute(v, comb, superstep) + messageCount += newMessages.size if (newVertex.active) activeVertexCount += 1 + Some((newVertex, newMessages)) - //val result = ArrayBuffer[(String, Either[V, M])]((newVertex.id, Left(newVertex))) - //result ++= newMessages.map(m => (m.targetId, Right(m))) case (Seq(v), Seq()) => val (newVertex, newMessages) = compute(v, defaultCombined(), superstep) + messageCount += newMessages.size if (newVertex.active) activeVertexCount += 1 + Some((newVertex, newMessages)) }.cache - //MATEI: Added this + // Force evaluation of processed RDD for accurate performance measurements processed.foreach(x => {}) - println("Done running compute on each vertex.") - - println("Checking stopping condition...") - val stop = messageCount.value == 0 && activeVertexCount.value == 0 val timeTaken = System.currentTimeMillis - startTime - println("Superstep %d took %d s".format(superstep, timeTaken / 1000)) + logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000)) - val newVerts = processed.mapValues(_._1) - val newMsgs = processed.flatMap(x => x._2._2.map(m => (m.targetId, m))) - - if (superstep >= 10) + // Check stopping condition and recurse + val stop = messageCount.value == 0 && activeVertexCount.value == 0 + if (stop || (maxSupersteps.isDefined && superstep >= maxSupersteps.get)) { processed.map { _._2._1 } - else - run(sc, newVerts, newMsgs, splits, messageCombiner, defaultCombined, mergeCombined, superstep + 1)(compute) + } else { + val newVerts = processed.mapValues(_._1) + val newMsgs = processed.flatMap(x => x._2._2.map(m => (m.targetId, m))) + run(sc, newVerts, newMsgs, splits, messageCombiner, defaultCombined, mergeCombined, maxSupersteps, superstep + 1)(compute) + } } } |