diff options
author | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-04-04 18:59:56 -0700 |
---|---|---|
committer | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-04-04 18:59:56 -0700 |
commit | 4a2b8aa55781a0aaa4a63b5da806fc72c58f9837 (patch) | |
tree | d7389a527b8ce4eea651e66501c24980b20d468c /graph | |
parent | 0667986c8eb1f325d807eb9def0440bafdb7b44d (diff) | |
download | spark-4a2b8aa55781a0aaa4a63b5da806fc72c58f9837.tar.gz spark-4a2b8aa55781a0aaa4a63b5da806fc72c58f9837.tar.bz2 spark-4a2b8aa55781a0aaa4a63b5da806fc72c58f9837.zip |
added dynamic graphlab but I have not yet tested it.
Diffstat (limited to 'graph')
-rw-r--r-- | graph/src/main/scala/spark/graph/GraphLab.scala | 82 |
1 files changed, 79 insertions, 3 deletions
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index 8899d9688a..d4fdb76040 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -32,6 +32,8 @@ object GraphLab { } + + def iterateGASOption[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( rawGraph: Graph[VD, ED])( gather: (Vid, EdgeWithVertices[VD, ED]) => A, @@ -49,9 +51,7 @@ object GraphLab { val accUpdates: RDD[(Vid, A)] = graph.mapReduceNeighborhoodFilter(someGather, merge, gatherDirection) - - def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update) } - graph = graph.updateVertices(accUpdates, applyFunc).cache() + graph = graph.updateVertices(accUpdates, apply).cache() i += 1 } @@ -60,4 +60,80 @@ object GraphLab { + def iterateDynamic[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( + rawGraph: Graph[VD, ED])( + rawGather: (Vid, EdgeWithVertices[VD, ED]) => A, + merge: (A, A) => A, + rawApply: (Vertex[VD], Option[A]) => VD, + rawScatter: (Vid, EdgeWithVertices[VD, ED]) => Boolean, + numIter: Int, + gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In, + rawScatterDirection: EdgeDirection.EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = { + + var graph = rawGraph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache() + + def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = { + if(e.vertex(vid).data._1) { + val edge = new EdgeWithVertices[VD,ED] + edge.src = Vertex(e.src.id, e.src.data._2) + edge.dst = Vertex(e.dst.id, e.dst.data._2) + Some(rawGather(vid, edge)) + } else { + None + } + } + + def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = { + if(v.data._1) (true, rawApply(Vertex(v.id, v.data._2), accum)) + else (false, v.data._2) + } + + def scatter(rawVid: Vid, e: EdgeWithVertices[(Boolean, VD),ED]) = { + val vid = e.otherVertex(rawVid).id + if(e.vertex(vid).data._1) { + val edge = new EdgeWithVertices[VD,ED] + edge.src = Vertex(e.src.id, e.src.data._2) + edge.dst = Vertex(e.dst.id, e.dst.data._2) + Some(rawScatter(vid, edge)) + } else { + None + } + } + + val scatterDirection = rawScatterDirection match { + case EdgeDirection.In => EdgeDirection.Out + case EdgeDirection.Out => EdgeDirection.In + case _ => rawScatterDirection + } + + def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) = + (accum.getOrElse(false), v.data._2) + + + + var i = 0 + var numActive = graph.numVertices + while (i < numIter && numActive > 0) { + + val accUpdates: RDD[(Vid, A)] = + graph.mapReduceNeighborhoodFilter(gather, merge, gatherDirection) + + graph = graph.updateVertices(accUpdates, apply).cache() + + val activeVertices: RDD[(Vid, Boolean)] = + graph.mapReduceNeighborhoodFilter(scatter, _ || _, scatterDirection) + + graph = graph.updateVertices(activeVertices, applyActive).cache() + + numActive = graph.vertices.map(v => if(v.data._1) 1 else 0).reduce( _ + _ ) + + println("Number active vertices: " + numActive) + + i += 1 + } + graph.mapVertices(v => Vertex(v.id, v.data._2)) + } + + + } |