aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2011-04-12 23:38:58 -0700
committerAnkur Dave <ankurdave@gmail.com>2011-05-03 15:38:01 -0700
commit62ef620354d2e996721395e2a7d0d039a7e9e2be (patch)
treed0bb579b6e8573dba8c6dacbad8119a591d3351c /bagel
parentc0736f6f68e47b82e3634252f8dba4f709a33ba5 (diff)
downloadspark-62ef620354d2e996721395e2a7d0d039a7e9e2be.tar.gz
spark-62ef620354d2e996721395e2a7d0d039a7e9e2be.tar.bz2
spark-62ef620354d2e996721395e2a7d0d039a7e9e2be.zip
Clean up Pregel.run, add logging
Diffstat (limited to 'bagel')
-rw-r--r--bagel/src/main/scala/bagel/Pregel.scala49
1 files changed, 23 insertions, 26 deletions
diff --git a/bagel/src/main/scala/bagel/Pregel.scala b/bagel/src/main/scala/bagel/Pregel.scala
index 02eec40b2c..b0f7a48b7a 100644
--- a/bagel/src/main/scala/bagel/Pregel.scala
+++ b/bagel/src/main/scala/bagel/Pregel.scala
@@ -18,61 +18,58 @@ object Pregel extends Logging {
* all vertices have voted to halt by setting their state to
* Inactive.
*/
- def run[V <: Vertex : Manifest, M <: Message : Manifest, C](sc: SparkContext, verts: RDD[(String, V)], msgs: RDD[(String, M)], splits: Int, messageCombiner: (C, M) => C, defaultCombined: () => C, mergeCombined: (C, C) => C, superstep: Int = 0)(compute: (V, C, Int) => (V, Iterable[M])): RDD[V] = {
- println("Starting superstep "+superstep+".")
+ def run[V <: Vertex : Manifest, M <: Message : Manifest, C](sc: SparkContext, verts: RDD[(String, V)], msgs: RDD[(String, M)], splits: Int, messageCombiner: (C, M) => C, defaultCombined: () => C, mergeCombined: (C, C) => C, maxSupersteps: Option[Int] = None, superstep: Int = 0)(compute: (V, C, Int) => (V, Iterable[M])): RDD[V] = {
+ logInfo("Starting superstep "+superstep+".")
val startTime = System.currentTimeMillis
// Bring together vertices and messages
- println("Joining vertices and messages...")
val combinedMsgs = msgs.combineByKey({x => messageCombiner(defaultCombined(), x)}, messageCombiner, mergeCombined, splits)
- println("verts.splits.size = " + verts.splits.size)
- println("combinedMsgs.splits.size = " + combinedMsgs.splits.size)
- println("verts.partitioner = " + verts.partitioner)
- println("combinedMsgs.partitioner = " + combinedMsgs.partitioner)
+ logDebug("verts.splits.size = " + verts.splits.size)
+ logDebug("combinedMsgs.splits.size = " + combinedMsgs.splits.size)
+ logDebug("verts.partitioner = " + verts.partitioner)
+ logDebug("combinedMsgs.partitioner = " + combinedMsgs.partitioner)
+
val joined = verts.groupWith(combinedMsgs)
- println("joined.splits.size = " + joined.splits.size)
- println("joined.partitioner = " + joined.partitioner)
- //val joined = graph.groupByKeyAsymmetrical(messageCombiner, defaultCombined, mergeCombined, splits)
- println("Done joining vertices and messages.")
+ logDebug("joined.splits.size = " + joined.splits.size)
+ logDebug("joined.partitioner = " + joined.partitioner)
// Run compute on each vertex
- println("Running compute on each vertex...")
var messageCount = sc.accumulator(0)
var activeVertexCount = sc.accumulator(0)
val processed = joined.flatMapValues {
case (Seq(), _) => None
case (Seq(v), Seq(comb)) =>
val (newVertex, newMessages) = compute(v, comb, superstep)
+
messageCount += newMessages.size
if (newVertex.active)
activeVertexCount += 1
+
Some((newVertex, newMessages))
- //val result = ArrayBuffer[(String, Either[V, M])]((newVertex.id, Left(newVertex)))
- //result ++= newMessages.map(m => (m.targetId, Right(m)))
case (Seq(v), Seq()) =>
val (newVertex, newMessages) = compute(v, defaultCombined(), superstep)
+
messageCount += newMessages.size
if (newVertex.active)
activeVertexCount += 1
+
Some((newVertex, newMessages))
}.cache
- //MATEI: Added this
+ // Force evaluation of processed RDD for accurate performance measurements
processed.foreach(x => {})
- println("Done running compute on each vertex.")
-
- println("Checking stopping condition...")
- val stop = messageCount.value == 0 && activeVertexCount.value == 0
val timeTaken = System.currentTimeMillis - startTime
- println("Superstep %d took %d s".format(superstep, timeTaken / 1000))
+ logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
- val newVerts = processed.mapValues(_._1)
- val newMsgs = processed.flatMap(x => x._2._2.map(m => (m.targetId, m)))
-
- if (superstep >= 10)
+ // Check stopping condition and recurse
+ val stop = messageCount.value == 0 && activeVertexCount.value == 0
+ if (stop || (maxSupersteps.isDefined && superstep >= maxSupersteps.get)) {
processed.map { _._2._1 }
- else
- run(sc, newVerts, newMsgs, splits, messageCombiner, defaultCombined, mergeCombined, superstep + 1)(compute)
+ } else {
+ val newVerts = processed.mapValues(_._1)
+ val newMsgs = processed.flatMap(x => x._2._2.map(m => (m.targetId, m)))
+ run(sc, newVerts, newMsgs, splits, messageCombiner, defaultCombined, mergeCombined, maxSupersteps, superstep + 1)(compute)
+ }
}
}