aboutsummaryrefslogtreecommitdiff
path: root/graph
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-06-29 21:53:38 -0700
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-06-29 21:53:38 -0700
commitf776301241ee92eccd4f1c90d511b3ae568242eb (patch)
treeebd567bff2ecbc7b99dd663521a0fc40bcf9efd2 /graph
parentf269e5975bee4581297e017fc0617df00e726c62 (diff)
downloadspark-f776301241ee92eccd4f1c90d511b3ae568242eb.tar.gz
spark-f776301241ee92eccd4f1c90d511b3ae568242eb.tar.bz2
spark-f776301241ee92eccd4f1c90d511b3ae568242eb.zip
Resurrecting the GraphLab gather-apply-scatter api
Diffstat (limited to 'graph')
-rw-r--r--graph/src/main/scala/spark/graph/GraphLab.scala232
1 files changed, 119 insertions, 113 deletions
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala
index 9c157b9361..504cd162ab 100644
--- a/graph/src/main/scala/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/spark/graph/GraphLab.scala
@@ -3,119 +3,125 @@ package spark.graph
import scala.collection.JavaConversions._
import spark.RDD
-
+/**
+ * This object implement the graphlab gather-apply-scatter api.
+ */
object GraphLab {
- // def iterateGA2[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
- // gather: (Vid, EdgeWithVertices[VD, ED]) => A,
- // merge: (A, A) => A,
- // default: A,
- // apply: (Vertex[VD], A) => VD,
- // numIter: Int,
- // gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
-
- // var g = graph.cache()
-
- // var i = 0
- // while (i < numIter) {
-
- // val accUpdates: RDD[(Vid, A)] =
- // g.aggregateNeighbors(gather, merge, default, gatherDirection)
-
- // def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
- // g = g.updateVertices(accUpdates, applyFunc).cache()
-
- // i += 1
- // }
- // g
- // }
-
- // def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
- // gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
- // mergeFunc: (A, A) => A,
- // applyFunc: (Vertex[VD], Option[A]) => VD,
- // numIter: Int,
- // gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
-
- // var g = graph.cache()
-
- // def someGather(vid: Vid, edge: EdgeWithVertices[VD, ED]) = Some(gatherFunc(vid, edge))
-
- // var i = 0
- // while (i < numIter) {
-
- // val accUpdates: RDD[(Vid, A)] =
- // g.flatMapReduceNeighborhood(someGather, mergeFunc, gatherDirection)
-
- // g = g.updateVertices(accUpdates, applyFunc).cache()
-
- // i += 1
- // }
- // g
- // }
-
- // def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
- // gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
- // mergeFunc: (A, A) => A,
- // applyFunc: (Vertex[VD], Option[A]) => VD,
- // scatterFunc: (Vid, EdgeWithVertices[VD, ED]) => Boolean,
- // numIter: Int,
- // gatherDirection: EdgeDirection = EdgeDirection.In,
- // scatterDirection: EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = {
-
- // var g = graph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache()
-
- // def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = {
- // if(e.vertex(vid).data._1) {
- // val edge = new EdgeWithVertices[VD,ED]
- // edge.src = Vertex(e.src.id, e.src.data._2)
- // edge.dst = Vertex(e.dst.id, e.dst.data._2)
- // Some(gatherFunc(vid, edge))
- // } else {
- // None
- // }
- // }
-
- // def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = {
- // if(v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum))
- // else (false, v.data._2)
- // }
-
- // def scatter(rawVid: Vid, e: EdgeWithVertices[(Boolean, VD),ED]) = {
- // val vid = e.otherVertex(rawVid).id
- // if(e.vertex(vid).data._1) {
- // val edge = new EdgeWithVertices[VD,ED]
- // edge.src = Vertex(e.src.id, e.src.data._2)
- // edge.dst = Vertex(e.dst.id, e.dst.data._2)
- // Some(scatterFunc(vid, edge))
- // } else {
- // None
- // }
- // }
-
- // def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) =
- // (accum.getOrElse(false), v.data._2)
-
- // var i = 0
- // var numActive = g.numVertices
- // while (i < numIter && numActive > 0) {
-
- // val accUpdates: RDD[(Vid, A)] =
- // g.flatMapReduceNeighborhood(gather, mergeFunc, gatherDirection)
-
- // g = g.updateVertices(accUpdates, apply).cache()
-
- // // Scatter is basically a gather in the opposite direction so we reverse the edge direction
- // val activeVertices: RDD[(Vid, Boolean)] =
- // g.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection.reverse)
-
- // g = g.updateVertices(activeVertices, applyActive).cache()
-
- // numActive = g.vertices.map(v => if (v.data._1) 1 else 0).reduce( _ + _ )
- // println("Number active vertices: " + numActive)
- // i += 1
- // }
-
- // g.mapVertices(v => Vertex(v.id, v.data._2))
- // }
+ /**
+ * Execute the GraphLab Gather-Apply-Scatter API
+ *
+ * @todo finish documenting GraphLab Gather-Apply-Scatter API
+ *
+ * @param graph The graph on which to execute the GraphLab API
+ * @param gatherFunc The gather function is executed on each edge triplet
+ * adjacent to a vertex and returns an accumulator which
+ * is then merged using the merge function.
+ * @param mergeFunc An accumulative associative operation on the result of
+ * the gather type.
+ * @param applyFunc Takes a vertex and the final result of the merge operations
+ * on the adjacent edges and returns a new vertex value.
+ * @param scatterFunc Executed after the apply function the scatter function takes
+ * a triplet and signals whether the neighboring vertex program
+ * must be recomputed.
+ * @param numIter The maximum number of iterations to run.
+ * @param gatherDirection The direction of edges to consider during the gather phase
+ * @param scatterDirection The direction of edges to consider during the scatter phase
+ *
+ * @tparam VD The graph vertex attribute type
+ * @tparam ED The graph edge attribute type
+ * @tparam A The type accumulated during the gather phase
+ * @return the resulting graph after the algorithm converges
+ */
+ def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
+ gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
+ mergeFunc: (A, A) => A,
+ applyFunc: (Vertex[VD], Option[A]) => VD,
+ scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
+ numIter: Int,
+ gatherDirection: EdgeDirection = EdgeDirection.In,
+ scatterDirection: EdgeDirection = EdgeDirection.Out): Graph[VD, ED] = {
+
+
+ // Add an active attribute to all vertices to track convergence.
+ var activeGraph = graph.mapVertices {
+ case Vertex(id, data) => (true, data)
+ }.cache()
+
+ // The gather function wrapper strips the active attribute and
+ // only invokes the gather function on active vertices
+ def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]) = {
+ if (e.vertex(vid).data._1) {
+ val edge = new EdgeTriplet[VD,ED]
+ edge.src = Vertex(e.src.id, e.src.data._2)
+ edge.dst = Vertex(e.dst.id, e.dst.data._2)
+ edge.data = e.data
+ Some(gatherFunc(vid, edge))
+ } else {
+ None
+ }
+ }
+
+ // The apply function wrapper strips the vertex of the active attribute
+ // and only invokes the apply function on active vertices
+ def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = {
+ if (v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum))
+ else (false, v.data._2)
+ }
+
+ // The scatter function wrapper strips the vertex of the active attribute
+ // and only invokes the scatter function on active vertices
+ def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]) = {
+ val vid = e.otherVertex(rawVid).id
+ if (e.vertex(vid).data._1) {
+ val edge = new EdgeTriplet[VD,ED]
+ edge.src = Vertex(e.src.id, e.src.data._2)
+ edge.dst = Vertex(e.dst.id, e.dst.data._2)
+ edge.data = e.data
+// val src = Vertex(e.src.id, e.src.data._2)
+// val dst = Vertex(e.dst.id, e.dst.data._2)
+// val edge = new EdgeTriplet[VD,ED](src, dst, e.data)
+ Some(scatterFunc(vid, edge))
+ } else {
+ None
+ }
+ }
+
+ // Used to set the active status of vertices for the next round
+ def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) =
+ (accum.getOrElse(false), v.data._2)
+
+ // Main Loop ---------------------------------------------------------------------
+ var i = 0
+ var numActive = activeGraph.numVertices
+ while (i < numIter && numActive > 0) {
+
+ val accUpdates: RDD[(Vid, A)] =
+ activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
+
+ activeGraph = activeGraph.leftJoinVertices(accUpdates, apply).cache()
+
+ // Scatter is basically a gather in the opposite direction so we reverse the edge direction
+ val activeVertices: RDD[(Vid, Boolean)] =
+ activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
+
+ activeGraph = activeGraph.leftJoinVertices(activeVertices, applyActive).cache()
+
+ numActive = activeGraph.vertices.map(v => if (v.data._1) 1 else 0).reduce(_ + _)
+ println("Number active vertices: " + numActive)
+ i += 1
+ }
+
+ // Remove the active attribute from the vertex data before returning the graph
+ activeGraph.mapVertices(v => v.data._2)
+ }
}
+
+
+
+
+
+
+
+
+