aboutsummaryrefslogtreecommitdiff
path: root/graph
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-04 23:51:06 -0700
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-04 23:51:06 -0700
commit14205548ceecac042c1174304031e78d438d4230 (patch)
treef950fa79dc0d28ea8289d11f2c6b658f636c7176 /graph
parent092708e57e47673636083f65a5b64700e15d361b (diff)
parente5dd61e720520dabea6f0b4d84eb135175c17f6a (diff)
downloadspark-14205548ceecac042c1174304031e78d438d4230.tar.gz
spark-14205548ceecac042c1174304031e78d438d4230.tar.bz2
spark-14205548ceecac042c1174304031e78d438d4230.zip
Merged with master
Merge branch 'graph' of https://github.com/rxin/spark into graph
Diffstat (limited to 'graph')
-rw-r--r--graph/src/main/scala/spark/graph/EdgeDirection.scala14
-rw-r--r--graph/src/main/scala/spark/graph/Graph.scala13
-rw-r--r--graph/src/main/scala/spark/graph/GraphLab.scala78
3 files changed, 49 insertions, 56 deletions
diff --git a/graph/src/main/scala/spark/graph/EdgeDirection.scala b/graph/src/main/scala/spark/graph/EdgeDirection.scala
index a5bd9749bf..a0c52c7038 100644
--- a/graph/src/main/scala/spark/graph/EdgeDirection.scala
+++ b/graph/src/main/scala/spark/graph/EdgeDirection.scala
@@ -1,9 +1,17 @@
package spark.graph
-object EdgeDirection extends Enumeration {
+sealed abstract class EdgeDirection {
+ def reverse: EdgeDirection = this match {
+ case EdgeDirection.In => EdgeDirection.In
+ case EdgeDirection.Out => EdgeDirection.Out
+ case EdgeDirection.Both => EdgeDirection.Both
+ }
+}
- type EdgeDirection = Value
- val None, In, Out, Both = Value
+object EdgeDirection {
+ case object In extends EdgeDirection
+ case object Out extends EdgeDirection
+ case object Both extends EdgeDirection
}
diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala
index 4bd3cc73e3..c22982d862 100644
--- a/graph/src/main/scala/spark/graph/Graph.scala
+++ b/graph/src/main/scala/spark/graph/Graph.scala
@@ -38,8 +38,9 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl
def vertex(vid: Vid): Vertex[VD] = if (src.id == vid) src else dst
- def relativeDirection(vid: Vid): EdgeDirection.EdgeDirection =
- if(vid == src.id) EdgeDirection.Out else EdgeDirection.In
+ def relativeDirection(vid: Vid): EdgeDirection = {
+ if (vid == src.id) EdgeDirection.Out else EdgeDirection.In
+ }
}
@@ -122,7 +123,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) })
}
- def collectNeighborIds(edgeDirection: EdgeDirection.EdgeDirection) : RDD[(Vid, Array[Vid])] = {
+ def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
mapReduceNeighborhood[Array[Vid]](
(vid, edge) => Array(edge.otherVertex(vid).id),
(a, b) => a ++ b,
@@ -146,7 +147,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => VD2,
reduceFunc: (VD2, VD2) => VD2,
default: VD2,
- gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = {
+ gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
@@ -188,7 +189,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
def flatMapReduceNeighborhood[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2,
- gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = {
+ gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
@@ -374,7 +375,7 @@ object Graph {
(if(c+1 < numCols) List(Edge(vid, index(r,c+1), 1.0F)) else List.empty)
}
new Graph(vertices, edges)
- }
+ }
/**
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala
index 61e9709477..b0efdadce9 100644
--- a/graph/src/main/scala/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/spark/graph/GraphLab.scala
@@ -31,58 +31,54 @@ object GraphLab {
// graph
// }
-
-
-
- def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](rawGraph: Graph[VD, ED])(
- gather: (Vid, EdgeWithVertices[VD, ED]) => A,
- merge: (A, A) => A,
- apply: (Vertex[VD], Option[A]) => VD,
+ def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
+ gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
+ mergeFunc: (A, A) => A,
+ applyFunc: (Vertex[VD], Option[A]) => VD,
numIter: Int,
- gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
+ gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
- var graph = rawGraph.cache()
+ var g = graph.cache()
- def someGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = Some(gather(vid, edge))
+ def someGather(vid: Vid, edge: EdgeWithVertices[VD, ED]) = Some(gatherFunc(vid, edge))
var i = 0
while (i < numIter) {
val accUpdates: RDD[(Vid, A)] =
- graph.flatMapReduceNeighborhood(someGather, merge, gatherDirection)
- graph = graph.updateVertices(accUpdates, apply).cache()
+ g.flatMapReduceNeighborhood(someGather, mergeFunc, gatherDirection)
+
+ g = g.updateVertices(accUpdates, applyFunc).cache()
i += 1
}
- graph
+ g
}
-
-
- def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](rawGraph: Graph[VD, ED])(
- rawGather: (Vid, EdgeWithVertices[VD, ED]) => A,
- merge: (A, A) => A,
- rawApply: (Vertex[VD], Option[A]) => VD,
- rawScatter: (Vid, EdgeWithVertices[VD, ED]) => Boolean,
+ def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
+ gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
+ mergeFunc: (A, A) => A,
+ applyFunc: (Vertex[VD], Option[A]) => VD,
+ scatterFunc: (Vid, EdgeWithVertices[VD, ED]) => Boolean,
numIter: Int,
- gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In,
- rawScatterDirection: EdgeDirection.EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = {
+ gatherDirection: EdgeDirection = EdgeDirection.In,
+ scatterDirection: EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = {
- var graph = rawGraph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache()
+ var g = graph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache()
def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = {
if(e.vertex(vid).data._1) {
val edge = new EdgeWithVertices[VD,ED]
edge.src = Vertex(e.src.id, e.src.data._2)
edge.dst = Vertex(e.dst.id, e.dst.data._2)
- Some(rawGather(vid, edge))
+ Some(gatherFunc(vid, edge))
} else {
None
}
}
def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = {
- if(v.data._1) (true, rawApply(Vertex(v.id, v.data._2), accum))
+ if(v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum))
else (false, v.data._2)
}
@@ -92,47 +88,35 @@ object GraphLab {
val edge = new EdgeWithVertices[VD,ED]
edge.src = Vertex(e.src.id, e.src.data._2)
edge.dst = Vertex(e.dst.id, e.dst.data._2)
- Some(rawScatter(vid, edge))
+ Some(scatterFunc(vid, edge))
} else {
None
}
}
- // Scatter is basically a gather in the opposite direction so we reverse the edge direction
- val scatterDirection = rawScatterDirection match {
- case EdgeDirection.In => EdgeDirection.Out
- case EdgeDirection.Out => EdgeDirection.In
- case _ => rawScatterDirection
- }
-
def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) =
(accum.getOrElse(false), v.data._2)
-
-
var i = 0
- var numActive = graph.numVertices
+ var numActive = g.numVertices
while (i < numIter && numActive > 0) {
val accUpdates: RDD[(Vid, A)] =
- graph.flatMapReduceNeighborhood(gather, merge, gatherDirection)
+ g.flatMapReduceNeighborhood(gather, mergeFunc, gatherDirection)
- graph = graph.updateVertices(accUpdates, apply).cache()
+ g = g.updateVertices(accUpdates, apply).cache()
+ // Scatter is basically a gather in the opposite direction so we reverse the edge direction
val activeVertices: RDD[(Vid, Boolean)] =
- graph.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection)
+ g.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection.reverse)
- graph = graph.updateVertices(activeVertices, applyActive).cache()
-
- numActive = graph.vertices.map(v => if(v.data._1) 1 else 0).reduce( _ + _ )
+ g = g.updateVertices(activeVertices, applyActive).cache()
+ numActive = g.vertices.map(v => if (v.data._1) 1 else 0).reduce( _ + _ )
println("Number active vertices: " + numActive)
-
i += 1
}
- graph.mapVertices(v => Vertex(v.id, v.data._2))
- }
-
-
+ g.mapVertices(v => Vertex(v.id, v.data._2))
+ }
}