diff options
author | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-04-04 23:51:06 -0700 |
---|---|---|
committer | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-04-04 23:51:06 -0700 |
commit | 14205548ceecac042c1174304031e78d438d4230 (patch) | |
tree | f950fa79dc0d28ea8289d11f2c6b658f636c7176 | |
parent | 092708e57e47673636083f65a5b64700e15d361b (diff) | |
parent | e5dd61e720520dabea6f0b4d84eb135175c17f6a (diff) | |
download | spark-14205548ceecac042c1174304031e78d438d4230.tar.gz spark-14205548ceecac042c1174304031e78d438d4230.tar.bz2 spark-14205548ceecac042c1174304031e78d438d4230.zip |
Merged with master
Merge branch 'graph' of https://github.com/rxin/spark into graph
-rw-r--r-- | graph/src/main/scala/spark/graph/EdgeDirection.scala | 14 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/Graph.scala | 13 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/GraphLab.scala | 78 |
3 files changed, 49 insertions, 56 deletions
diff --git a/graph/src/main/scala/spark/graph/EdgeDirection.scala b/graph/src/main/scala/spark/graph/EdgeDirection.scala index a5bd9749bf..a0c52c7038 100644 --- a/graph/src/main/scala/spark/graph/EdgeDirection.scala +++ b/graph/src/main/scala/spark/graph/EdgeDirection.scala @@ -1,9 +1,17 @@ package spark.graph -object EdgeDirection extends Enumeration { +sealed abstract class EdgeDirection { + def reverse: EdgeDirection = this match { + case EdgeDirection.In => EdgeDirection.In + case EdgeDirection.Out => EdgeDirection.Out + case EdgeDirection.Both => EdgeDirection.Both + } +} - type EdgeDirection = Value - val None, In, Out, Both = Value +object EdgeDirection { + case object In extends EdgeDirection + case object Out extends EdgeDirection + case object Both extends EdgeDirection } diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 4bd3cc73e3..c22982d862 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -38,8 +38,9 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl def vertex(vid: Vid): Vertex[VD] = if (src.id == vid) src else dst - def relativeDirection(vid: Vid): EdgeDirection.EdgeDirection = - if(vid == src.id) EdgeDirection.Out else EdgeDirection.In + def relativeDirection(vid: Vid): EdgeDirection = { + if (vid == src.id) EdgeDirection.Out else EdgeDirection.In + } } @@ -122,7 +123,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected ( newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) }) } - def collectNeighborIds(edgeDirection: EdgeDirection.EdgeDirection) : RDD[(Vid, Array[Vid])] = { + def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { mapReduceNeighborhood[Array[Vid]]( (vid, edge) => Array(edge.otherVertex(vid).id), (a, b) => a ++ b, @@ -146,7 +147,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected ( mapFunc: (Vid, EdgeWithVertices[VD, ED]) => VD2, reduceFunc: (VD2, VD2) => VD2, default: VD2, - gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = { + gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = { ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(reduceFunc) @@ -188,7 +189,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected ( def flatMapReduceNeighborhood[VD2: ClassManifest]( mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], reduceFunc: (VD2, VD2) => VD2, - gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = { + gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = { ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(reduceFunc) @@ -374,7 +375,7 @@ object Graph { (if(c+1 < numCols) List(Edge(vid, index(r,c+1), 1.0F)) else List.empty) } new Graph(vertices, edges) - } + } /** diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index 61e9709477..b0efdadce9 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -31,58 +31,54 @@ object GraphLab { // graph // } - - - - def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](rawGraph: Graph[VD, ED])( - gather: (Vid, EdgeWithVertices[VD, ED]) => A, - merge: (A, A) => A, - apply: (Vertex[VD], Option[A]) => VD, + def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, + mergeFunc: (A, A) => A, + applyFunc: (Vertex[VD], Option[A]) => VD, numIter: Int, - gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { + gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { - var graph = rawGraph.cache() + var g = graph.cache() - def someGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = Some(gather(vid, edge)) + def someGather(vid: Vid, edge: EdgeWithVertices[VD, ED]) = Some(gatherFunc(vid, edge)) var i = 0 while (i < numIter) { val accUpdates: RDD[(Vid, A)] = - graph.flatMapReduceNeighborhood(someGather, merge, gatherDirection) - graph = graph.updateVertices(accUpdates, apply).cache() + g.flatMapReduceNeighborhood(someGather, mergeFunc, gatherDirection) + + g = g.updateVertices(accUpdates, applyFunc).cache() i += 1 } - graph + g } - - - def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](rawGraph: Graph[VD, ED])( - rawGather: (Vid, EdgeWithVertices[VD, ED]) => A, - merge: (A, A) => A, - rawApply: (Vertex[VD], Option[A]) => VD, - rawScatter: (Vid, EdgeWithVertices[VD, ED]) => Boolean, + def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, + mergeFunc: (A, A) => A, + applyFunc: (Vertex[VD], Option[A]) => VD, + scatterFunc: (Vid, EdgeWithVertices[VD, ED]) => Boolean, numIter: Int, - gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In, - rawScatterDirection: EdgeDirection.EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = { + gatherDirection: EdgeDirection = EdgeDirection.In, + scatterDirection: EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = { - var graph = rawGraph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache() + var g = graph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache() def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = { if(e.vertex(vid).data._1) { val edge = new EdgeWithVertices[VD,ED] edge.src = Vertex(e.src.id, e.src.data._2) edge.dst = Vertex(e.dst.id, e.dst.data._2) - Some(rawGather(vid, edge)) + Some(gatherFunc(vid, edge)) } else { None } } def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = { - if(v.data._1) (true, rawApply(Vertex(v.id, v.data._2), accum)) + if(v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum)) else (false, v.data._2) } @@ -92,47 +88,35 @@ object GraphLab { val edge = new EdgeWithVertices[VD,ED] edge.src = Vertex(e.src.id, e.src.data._2) edge.dst = Vertex(e.dst.id, e.dst.data._2) - Some(rawScatter(vid, edge)) + Some(scatterFunc(vid, edge)) } else { None } } - // Scatter is basically a gather in the opposite direction so we reverse the edge direction - val scatterDirection = rawScatterDirection match { - case EdgeDirection.In => EdgeDirection.Out - case EdgeDirection.Out => EdgeDirection.In - case _ => rawScatterDirection - } - def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) = (accum.getOrElse(false), v.data._2) - - var i = 0 - var numActive = graph.numVertices + var numActive = g.numVertices while (i < numIter && numActive > 0) { val accUpdates: RDD[(Vid, A)] = - graph.flatMapReduceNeighborhood(gather, merge, gatherDirection) + g.flatMapReduceNeighborhood(gather, mergeFunc, gatherDirection) - graph = graph.updateVertices(accUpdates, apply).cache() + g = g.updateVertices(accUpdates, apply).cache() + // Scatter is basically a gather in the opposite direction so we reverse the edge direction val activeVertices: RDD[(Vid, Boolean)] = - graph.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection) + g.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection.reverse) - graph = graph.updateVertices(activeVertices, applyActive).cache() - - numActive = graph.vertices.map(v => if(v.data._1) 1 else 0).reduce( _ + _ ) + g = g.updateVertices(activeVertices, applyActive).cache() + numActive = g.vertices.map(v => if (v.data._1) 1 else 0).reduce( _ + _ ) println("Number active vertices: " + numActive) - i += 1 } - graph.mapVertices(v => Vertex(v.id, v.data._2)) - } - - + g.mapVertices(v => Vertex(v.id, v.data._2)) + } } |