aboutsummaryrefslogtreecommitdiff
path: root/graph
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-04 17:56:16 -0700
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-04 17:56:16 -0700
commitcb99fc193c489ad3b481c165fd0c23cd596808df (patch)
tree5a69cfe877965d568111e91792355f7ae6736be4 /graph
parentdb45cf3a497ffdde17a4964d8c31da6e995ad03a (diff)
downloadspark-cb99fc193c489ad3b481c165fd0c23cd596808df.tar.gz
spark-cb99fc193c489ad3b481c165fd0c23cd596808df.tar.bz2
spark-cb99fc193c489ad3b481c165fd0c23cd596808df.zip
Added graphlab style implementation of Pregel and a more sophisticated version of mapReduceNeighborhoodFilter
Diffstat (limited to 'graph')
-rw-r--r--graph/src/main/scala/spark/graph/Graph.scala55
-rw-r--r--graph/src/main/scala/spark/graph/GraphLab.scala53
-rw-r--r--graph/src/main/scala/spark/graph/Pregel.scala34
3 files changed, 129 insertions, 13 deletions
diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala
index 1117cb5a68..b264a5e209 100644
--- a/graph/src/main/scala/spark/graph/Graph.scala
+++ b/graph/src/main/scala/spark/graph/Graph.scala
@@ -189,6 +189,61 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
}
+
+ def mapReduceNeighborhoodFilter[VD2: ClassManifest](
+ mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
+ reduceFunc: (VD2, VD2) => VD2,
+ gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = {
+
+ ClosureCleaner.clean(mapFunc)
+ ClosureCleaner.clean(reduceFunc)
+
+ val newVTable = vTableReplicated.mapPartitions({ part =>
+ part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
+ }, preservesPartitioning = true)
+
+ (new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
+ .mapPartitions { part =>
+ val (vmap, edges) = part.next()
+ val edgeSansAcc = new EdgeWithVertices[VD, ED]()
+ edgeSansAcc.src = new Vertex[VD]
+ edgeSansAcc.dst = new Vertex[VD]
+ edges.foreach { edge: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
+ edgeSansAcc.data = edge.data
+ edgeSansAcc.src.data = edge.src.data._1
+ edgeSansAcc.dst.data = edge.dst.data._1
+ edgeSansAcc.src.id = edge.src.id
+ edgeSansAcc.dst.id = edge.dst.id
+ if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
+ edge.dst.data._2 =
+ if(edge.dst.data._2.isEmpty) mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
+ else {
+ val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
+ if(!tmp.isEmpty) Some(reduceFunc(edge.dst.data._2.get, tmp.get))
+ else edge.dst.data._2
+ }
+ }
+ if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
+ edge.dst.data._2 =
+ if(edge.dst.data._2.isEmpty) mapFunc(edgeSansAcc.src.id, edgeSansAcc)
+ else {
+ val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
+ if(!tmp.isEmpty) Some(reduceFunc(edge.src.data._2.get, tmp.get))
+ else edge.src.data._2
+ }
+ }
+ }
+ vmap.int2ObjectEntrySet().fastIterator().filter{!_.getValue()._2.isEmpty}.map{ entry =>
+ (entry.getIntKey(), entry.getValue()._2)
+ }
+ }
+ .map{ case (vid, aOpt) => (vid, aOpt.get) }
+ .combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
+
+ }
+
+
+
def updateVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)],
updateFunc: (Vertex[VD], Option[U]) => VD2)
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala
index eb992c464a..8899d9688a 100644
--- a/graph/src/main/scala/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/spark/graph/GraphLab.scala
@@ -7,30 +7,57 @@ import spark.RDD
object GraphLab {
def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
- graph: Graph[VD, ED])(
- gather: (Vid, EdgeWithVertices[VD, ED]) => A,
- merge: (A, A) => A,
- default: A,
- apply: (Vertex[VD], A) => VD,
- numIter: Int,
- gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In)
- : Graph[VD, ED] = {
+ rawGraph: Graph[VD, ED])(
+ gather: (Vid, EdgeWithVertices[VD, ED]) => A,
+ merge: (A, A) => A,
+ default: A,
+ apply: (Vertex[VD], A) => VD,
+ numIter: Int,
+ gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
- var g = graph.cache()
+ var graph = rawGraph.cache()
var i = 0
while (i < numIter) {
- val accUpdates: RDD[(Vid, A)] = g.mapReduceNeighborhood(
- gather, merge, default, gatherDirection)
+ val accUpdates: RDD[(Vid, A)] =
+ graph.mapReduceNeighborhood(gather, merge, default, gatherDirection)
def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
- g = g.updateVertices(accUpdates, applyFunc).cache()
+ graph = graph.updateVertices(accUpdates, applyFunc).cache()
i += 1
}
+ graph
+ }
+
+
+ def iterateGASOption[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
+ rawGraph: Graph[VD, ED])(
+ gather: (Vid, EdgeWithVertices[VD, ED]) => A,
+ merge: (A, A) => A,
+ apply: (Vertex[VD], Option[A]) => VD,
+ numIter: Int,
+ gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
+
+ var graph = rawGraph.cache()
+
+ def someGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = Some(gather(vid, edge))
+
+ var i = 0
+ while (i < numIter) {
+
+ val accUpdates: RDD[(Vid, A)] =
+ graph.mapReduceNeighborhoodFilter(someGather, merge, gatherDirection)
- g
+ def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update) }
+ graph = graph.updateVertices(accUpdates, applyFunc).cache()
+
+ i += 1
+ }
+ graph
}
+
+
}
diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/spark/graph/Pregel.scala
new file mode 100644
index 0000000000..6c551c21dd
--- /dev/null
+++ b/graph/src/main/scala/spark/graph/Pregel.scala
@@ -0,0 +1,34 @@
+package spark.graph
+
+import scala.collection.JavaConversions._
+import spark.RDD
+
+
+object Pregel {
+
+ def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
+ rawGraph: Graph[VD, ED])(
+ vprog: ( Vertex[VD], A) => VD,
+ sendMsg: (Vid, EdgeWithVertices[VD, ED]) => Option[A],
+ mergeMsg: (A, A) => A,
+ numIter: Int) : Graph[VD, ED] = {
+
+ var graph = rawGraph.cache
+ var i = 0
+ while (i < numIter) {
+
+ val msgs: RDD[(Vid, A)] =
+ graph.mapReduceNeighborhoodFilter(sendMsg, mergeMsg, EdgeDirection.In)
+
+ def runProg(v: Vertex[VD], msg: Option[A]): VD =
+ if(msg.isEmpty) v.data else vprog(v, msg.get)
+
+ graph = graph.updateVertices(msgs, runProg).cache()
+
+ i += 1
+ }
+ graph
+
+ }
+
+}