aboutsummaryrefslogtreecommitdiff
path: root/graph
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-04 18:59:56 -0700
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-04 18:59:56 -0700
commit4a2b8aa55781a0aaa4a63b5da806fc72c58f9837 (patch)
treed7389a527b8ce4eea651e66501c24980b20d468c /graph
parent0667986c8eb1f325d807eb9def0440bafdb7b44d (diff)
downloadspark-4a2b8aa55781a0aaa4a63b5da806fc72c58f9837.tar.gz
spark-4a2b8aa55781a0aaa4a63b5da806fc72c58f9837.tar.bz2
spark-4a2b8aa55781a0aaa4a63b5da806fc72c58f9837.zip
added dynamic graphlab but I have not yet tested it.
Diffstat (limited to 'graph')
-rw-r--r--graph/src/main/scala/spark/graph/GraphLab.scala82
1 files changed, 79 insertions, 3 deletions
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala
index 8899d9688a..d4fdb76040 100644
--- a/graph/src/main/scala/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/spark/graph/GraphLab.scala
@@ -32,6 +32,8 @@ object GraphLab {
}
+
+
def iterateGASOption[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])(
gather: (Vid, EdgeWithVertices[VD, ED]) => A,
@@ -49,9 +51,7 @@ object GraphLab {
val accUpdates: RDD[(Vid, A)] =
graph.mapReduceNeighborhoodFilter(someGather, merge, gatherDirection)
-
- def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update) }
- graph = graph.updateVertices(accUpdates, applyFunc).cache()
+ graph = graph.updateVertices(accUpdates, apply).cache()
i += 1
}
@@ -60,4 +60,80 @@ object GraphLab {
+ def iterateDynamic[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
+ rawGraph: Graph[VD, ED])(
+ rawGather: (Vid, EdgeWithVertices[VD, ED]) => A,
+ merge: (A, A) => A,
+ rawApply: (Vertex[VD], Option[A]) => VD,
+ rawScatter: (Vid, EdgeWithVertices[VD, ED]) => Boolean,
+ numIter: Int,
+ gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In,
+ rawScatterDirection: EdgeDirection.EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = {
+
+ var graph = rawGraph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache()
+
+ def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = {
+ if(e.vertex(vid).data._1) {
+ val edge = new EdgeWithVertices[VD,ED]
+ edge.src = Vertex(e.src.id, e.src.data._2)
+ edge.dst = Vertex(e.dst.id, e.dst.data._2)
+ Some(rawGather(vid, edge))
+ } else {
+ None
+ }
+ }
+
+ def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = {
+ if(v.data._1) (true, rawApply(Vertex(v.id, v.data._2), accum))
+ else (false, v.data._2)
+ }
+
+ def scatter(rawVid: Vid, e: EdgeWithVertices[(Boolean, VD),ED]) = {
+ val vid = e.otherVertex(rawVid).id
+ if(e.vertex(vid).data._1) {
+ val edge = new EdgeWithVertices[VD,ED]
+ edge.src = Vertex(e.src.id, e.src.data._2)
+ edge.dst = Vertex(e.dst.id, e.dst.data._2)
+ Some(rawScatter(vid, edge))
+ } else {
+ None
+ }
+ }
+
+ val scatterDirection = rawScatterDirection match {
+ case EdgeDirection.In => EdgeDirection.Out
+ case EdgeDirection.Out => EdgeDirection.In
+ case _ => rawScatterDirection
+ }
+
+ def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) =
+ (accum.getOrElse(false), v.data._2)
+
+
+
+ var i = 0
+ var numActive = graph.numVertices
+ while (i < numIter && numActive > 0) {
+
+ val accUpdates: RDD[(Vid, A)] =
+ graph.mapReduceNeighborhoodFilter(gather, merge, gatherDirection)
+
+ graph = graph.updateVertices(accUpdates, apply).cache()
+
+ val activeVertices: RDD[(Vid, Boolean)] =
+ graph.mapReduceNeighborhoodFilter(scatter, _ || _, scatterDirection)
+
+ graph = graph.updateVertices(activeVertices, applyActive).cache()
+
+ numActive = graph.vertices.map(v => if(v.data._1) 1 else 0).reduce( _ + _ )
+
+ println("Number active vertices: " + numActive)
+
+ i += 1
+ }
+ graph.mapVertices(v => Vertex(v.id, v.data._2))
+ }
+
+
+
}