aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-07 18:19:30 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-12 18:03:19 -0800
commit5b0d6f0ad52f3d27eb9001a6f7d88c3281e93014 (patch)
treec244262680780cd229f4a06011c126036cba6d20 /graph/src
parentce6ca4ea6177432aef694f6213e36c75ac213bf4 (diff)
downloadspark-5b0d6f0ad52f3d27eb9001a6f7d88c3281e93014.tar.gz
spark-5b0d6f0ad52f3d27eb9001a6f7d88c3281e93014.tar.bz2
spark-5b0d6f0ad52f3d27eb9001a6f7d88c3281e93014.zip
Remove static Pregel; take maxIters in dynamic Pregel
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala85
1 files changed, 5 insertions, 80 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 8e9ca89efc..9fb1d3fd8c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -11,10 +11,6 @@ package org.apache.spark.graph
* execution while also exposing greater flexibility for graph based
* computation.
*
- * This object present several variants of the bulk synchronous
- * execution that differ only in the edge direction along which
- * messages are sent and whether a fixed number of iterations is used.
- *
* @example We can use the Pregel abstraction to implement PageRank
* {{{
* val pagerankGraph: Graph[Double, Double] = graph
@@ -41,7 +37,6 @@ package org.apache.spark.graph
*/
object Pregel {
-
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
@@ -56,7 +51,8 @@ object Pregel {
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
- * This function iterates a fixed number (`numIter`) of iterations.
+ * This function iterates until there are no remaining messages, or
+ * for maxIterations iterations.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
@@ -67,78 +63,7 @@ object Pregel {
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
- * @param numIter the number of iterations to run this computation.
- *
- * @param vprog the user-defined vertex program which runs on each
- * vertex and receives the inbound message and computes a new vertex
- * value. On the first iteration the vertex program is invoked on
- * all vertices and is passed the default message. On subsequent
- * iterations the vertex program is only invoked on those vertices
- * that receive messages.
- *
- * @param sendMsg a user supplied function that is applied to out
- * edges of vertices that received messages in the current
- * iteration.
- *
- * @param mergeMsg a user supplied function that takes two incoming
- * messages of type A and merges them into a single message of type
- * A. ''This function must be commutative and associative and
- * ideally the size of A should not increase.''
- *
- * @return the resulting graph at the end of the computation
- *
- */
- def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
- (graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
- vprog: (Vid, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
- mergeMsg: (A, A) => A)
- : Graph[VD, ED] = {
-
- // Receive the first set of messages
- var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
-
- var i = 0
- while (i < numIter) {
- // compute the messages
- val messages = g.mapReduceTriplets(sendMsg, mergeMsg) // broadcast & aggregation
- // receive the messages
- val changedVerts = g.vertices.deltaJoin(messages)(vprog).cache() // updating the vertices
- // replicate the changed vertices
- g = g.deltaJoinVertices(changedVerts)
- // count the iteration
- i += 1
- }
-
- // Return the final graph
- g
- } // end of apply
-
-
- /**
- * Execute a Pregel-like iterative vertex-parallel abstraction. The
- * user-defined vertex-program `vprog` is executed in parallel on
- * each vertex receiving any inbound messages and computing a new
- * value for the vertex. The `sendMsg` function is then invoked on
- * all out-edges and is used to compute an optional message to the
- * destination vertex. The `mergeMsg` function is a commutative
- * associative function used to combine messages destined to the
- * same vertex.
- *
- * On the first iteration all vertices receive the `initialMsg` and
- * on subsequent iterations if a vertex does not receive a message
- * then the vertex-program is not invoked.
- *
- * This function iterates until there are no remaining messages.
- *
- * @tparam VD the vertex data type
- * @tparam ED the edge data type
- * @tparam A the Pregel message type
- *
- * @param graph the input graph.
- *
- * @param initialMsg the message each vertex will receive at the on
- * the first iteration.
+ * @param maxIterations the maximum number of iterations to run for.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
@@ -160,7 +85,7 @@ object Pregel {
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
- (graph: Graph[VD, ED], initialMsg: A)(
+ (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
vprog: (Vid, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
mergeMsg: (A, A) => A)
@@ -180,7 +105,7 @@ object Pregel {
var activeMessages = messages.count()
// Loop
var i = 0
- while (activeMessages > 0) {
+ while (activeMessages > 0 && i < maxIterations) {
// receive the messages
val changedVerts = g.vertices.deltaJoin(messages)(vprog).cache() // updating the vertices
// replicate the changed vertices