diff options
author | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-06-29 21:53:38 -0700 |
---|---|---|
committer | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-06-29 21:53:38 -0700 |
commit | f776301241ee92eccd4f1c90d511b3ae568242eb (patch) | |
tree | ebd567bff2ecbc7b99dd663521a0fc40bcf9efd2 /graph | |
parent | f269e5975bee4581297e017fc0617df00e726c62 (diff) | |
download | spark-f776301241ee92eccd4f1c90d511b3ae568242eb.tar.gz spark-f776301241ee92eccd4f1c90d511b3ae568242eb.tar.bz2 spark-f776301241ee92eccd4f1c90d511b3ae568242eb.zip |
Resurrecting the GraphLab gather-apply-scatter api
Diffstat (limited to 'graph')
-rw-r--r-- | graph/src/main/scala/spark/graph/GraphLab.scala | 232 |
1 files changed, 119 insertions, 113 deletions
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index 9c157b9361..504cd162ab 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -3,119 +3,125 @@ package spark.graph import scala.collection.JavaConversions._ import spark.RDD - +/** + * This object implement the graphlab gather-apply-scatter api. + */ object GraphLab { - // def iterateGA2[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( - // gather: (Vid, EdgeWithVertices[VD, ED]) => A, - // merge: (A, A) => A, - // default: A, - // apply: (Vertex[VD], A) => VD, - // numIter: Int, - // gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { - - // var g = graph.cache() - - // var i = 0 - // while (i < numIter) { - - // val accUpdates: RDD[(Vid, A)] = - // g.aggregateNeighbors(gather, merge, default, gatherDirection) - - // def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } - // g = g.updateVertices(accUpdates, applyFunc).cache() - - // i += 1 - // } - // g - // } - - // def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( - // gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, - // mergeFunc: (A, A) => A, - // applyFunc: (Vertex[VD], Option[A]) => VD, - // numIter: Int, - // gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { - - // var g = graph.cache() - - // def someGather(vid: Vid, edge: EdgeWithVertices[VD, ED]) = Some(gatherFunc(vid, edge)) - - // var i = 0 - // while (i < numIter) { - - // val accUpdates: RDD[(Vid, A)] = - // g.flatMapReduceNeighborhood(someGather, mergeFunc, gatherDirection) - - // g = g.updateVertices(accUpdates, applyFunc).cache() - - // i += 1 - // } - // g - // } - - // def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( - // gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, - // mergeFunc: (A, A) => A, - // applyFunc: (Vertex[VD], Option[A]) => VD, - // scatterFunc: (Vid, EdgeWithVertices[VD, ED]) => Boolean, - // numIter: Int, - // gatherDirection: EdgeDirection = EdgeDirection.In, - // scatterDirection: EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = { - - // var g = graph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache() - - // def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = { - // if(e.vertex(vid).data._1) { - // val edge = new EdgeWithVertices[VD,ED] - // edge.src = Vertex(e.src.id, e.src.data._2) - // edge.dst = Vertex(e.dst.id, e.dst.data._2) - // Some(gatherFunc(vid, edge)) - // } else { - // None - // } - // } - - // def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = { - // if(v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum)) - // else (false, v.data._2) - // } - - // def scatter(rawVid: Vid, e: EdgeWithVertices[(Boolean, VD),ED]) = { - // val vid = e.otherVertex(rawVid).id - // if(e.vertex(vid).data._1) { - // val edge = new EdgeWithVertices[VD,ED] - // edge.src = Vertex(e.src.id, e.src.data._2) - // edge.dst = Vertex(e.dst.id, e.dst.data._2) - // Some(scatterFunc(vid, edge)) - // } else { - // None - // } - // } - - // def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) = - // (accum.getOrElse(false), v.data._2) - - // var i = 0 - // var numActive = g.numVertices - // while (i < numIter && numActive > 0) { - - // val accUpdates: RDD[(Vid, A)] = - // g.flatMapReduceNeighborhood(gather, mergeFunc, gatherDirection) - - // g = g.updateVertices(accUpdates, apply).cache() - - // // Scatter is basically a gather in the opposite direction so we reverse the edge direction - // val activeVertices: RDD[(Vid, Boolean)] = - // g.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection.reverse) - - // g = g.updateVertices(activeVertices, applyActive).cache() - - // numActive = g.vertices.map(v => if (v.data._1) 1 else 0).reduce( _ + _ ) - // println("Number active vertices: " + numActive) - // i += 1 - // } - - // g.mapVertices(v => Vertex(v.id, v.data._2)) - // } + /** + * Execute the GraphLab Gather-Apply-Scatter API + * + * @todo finish documenting GraphLab Gather-Apply-Scatter API + * + * @param graph The graph on which to execute the GraphLab API + * @param gatherFunc The gather function is executed on each edge triplet + * adjacent to a vertex and returns an accumulator which + * is then merged using the merge function. + * @param mergeFunc An accumulative associative operation on the result of + * the gather type. + * @param applyFunc Takes a vertex and the final result of the merge operations + * on the adjacent edges and returns a new vertex value. + * @param scatterFunc Executed after the apply function the scatter function takes + * a triplet and signals whether the neighboring vertex program + * must be recomputed. + * @param numIter The maximum number of iterations to run. + * @param gatherDirection The direction of edges to consider during the gather phase + * @param scatterDirection The direction of edges to consider during the scatter phase + * + * @tparam VD The graph vertex attribute type + * @tparam ED The graph edge attribute type + * @tparam A The type accumulated during the gather phase + * @return the resulting graph after the algorithm converges + */ + def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A, + mergeFunc: (A, A) => A, + applyFunc: (Vertex[VD], Option[A]) => VD, + scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean, + numIter: Int, + gatherDirection: EdgeDirection = EdgeDirection.In, + scatterDirection: EdgeDirection = EdgeDirection.Out): Graph[VD, ED] = { + + + // Add an active attribute to all vertices to track convergence. + var activeGraph = graph.mapVertices { + case Vertex(id, data) => (true, data) + }.cache() + + // The gather function wrapper strips the active attribute and + // only invokes the gather function on active vertices + def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]) = { + if (e.vertex(vid).data._1) { + val edge = new EdgeTriplet[VD,ED] + edge.src = Vertex(e.src.id, e.src.data._2) + edge.dst = Vertex(e.dst.id, e.dst.data._2) + edge.data = e.data + Some(gatherFunc(vid, edge)) + } else { + None + } + } + + // The apply function wrapper strips the vertex of the active attribute + // and only invokes the apply function on active vertices + def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = { + if (v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum)) + else (false, v.data._2) + } + + // The scatter function wrapper strips the vertex of the active attribute + // and only invokes the scatter function on active vertices + def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]) = { + val vid = e.otherVertex(rawVid).id + if (e.vertex(vid).data._1) { + val edge = new EdgeTriplet[VD,ED] + edge.src = Vertex(e.src.id, e.src.data._2) + edge.dst = Vertex(e.dst.id, e.dst.data._2) + edge.data = e.data +// val src = Vertex(e.src.id, e.src.data._2) +// val dst = Vertex(e.dst.id, e.dst.data._2) +// val edge = new EdgeTriplet[VD,ED](src, dst, e.data) + Some(scatterFunc(vid, edge)) + } else { + None + } + } + + // Used to set the active status of vertices for the next round + def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) = + (accum.getOrElse(false), v.data._2) + + // Main Loop --------------------------------------------------------------------- + var i = 0 + var numActive = activeGraph.numVertices + while (i < numIter && numActive > 0) { + + val accUpdates: RDD[(Vid, A)] = + activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) + + activeGraph = activeGraph.leftJoinVertices(accUpdates, apply).cache() + + // Scatter is basically a gather in the opposite direction so we reverse the edge direction + val activeVertices: RDD[(Vid, Boolean)] = + activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) + + activeGraph = activeGraph.leftJoinVertices(activeVertices, applyActive).cache() + + numActive = activeGraph.vertices.map(v => if (v.data._1) 1 else 0).reduce(_ + _) + println("Number active vertices: " + numActive) + i += 1 + } + + // Remove the active attribute from the vertex data before returning the graph + activeGraph.mapVertices(v => v.data._2) + } } + + + + + + + + + |