aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-04 19:18:15 -0700
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-04 19:18:15 -0700
commitb53761357019b1c0ce3dd3094876dcee702aff37 (patch)
treef7c238278e6269bff15ff829f74d4d59ae586ebd
parent4a2b8aa55781a0aaa4a63b5da806fc72c58f9837 (diff)
downloadspark-b53761357019b1c0ce3dd3094876dcee702aff37.tar.gz
spark-b53761357019b1c0ce3dd3094876dcee702aff37.tar.bz2
spark-b53761357019b1c0ce3dd3094876dcee702aff37.zip
added dynamic graphlab
-rw-r--r--graph/src/main/scala/spark/graph/Analytics.scala81
-rw-r--r--graph/src/main/scala/spark/graph/GraphLab.scala42
2 files changed, 57 insertions, 66 deletions
diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala
index 27b8e371d4..40f378fbc2 100644
--- a/graph/src/main/scala/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/spark/graph/Analytics.scala
@@ -23,6 +23,7 @@ import spark.SparkContext._
object Analytics {
+
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
@@ -31,15 +32,36 @@ object Analytics {
val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
(vertex, degIter) => (degIter.sum, 1.0F)
)
- GraphLab.iterateGAS[(Int, Float), ED, Float](pagerankGraph)(
+ GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Float, b: Float) => a + b, // merge
- 0F,
- (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply
- numIter).vertices.map{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
+ (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
+ numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
+ }
+
+
+ /**
+ * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
+ */
+ def dynamicPageRank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
+ tol: Float, maxIter: Int = 10) = {
+ // Compute the out degree of each vertex
+ val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees,
+ (vertex, degIter) => (degIter.sum, 1.0F, 1.0F)
+ )
+
+ // Run PageRank
+ GraphLab.iterateGAS(pagerankGraph)(
+ (me_id, edge) => edge.src.data._2 / edge.dst.data._1, // gather
+ (a: Float, b: Float) => a + b,
+ (vertex, a: Option[Float]) =>
+ (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply
+ (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter
+ maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) }
}
+
/**
* Compute the connected component membership of each vertex
* and return an RDD with the vertex value containing the
@@ -48,13 +70,12 @@ object Analytics {
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) }
- GraphLab.iterateGAS[Int, ED, Int](ccGraph)(
+ GraphLab.iterateGA[Int, ED, Int](ccGraph)(
(me_id, edge) => edge.otherVertex(me_id).data, // gather
(a: Int, b: Int) => math.min(a, b), // merge
- Integer.MAX_VALUE,
- (v, a: Int) => math.min(v.data, a), // apply
+ (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply
numIter,
- gatherDirection = EdgeDirection.Both).vertices
+ gatherDirection = EdgeDirection.Both)
}
@@ -66,47 +87,17 @@ object Analytics {
val spGraph = graph.mapVertices {
case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
}
- GraphLab.iterateGAS[Float, Float, Float](spGraph)(
+ GraphLab.iterateGA[Float, Float, Float](spGraph)(
(me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
(a: Float, b: Float) => math.min(a, b), // merge
- Float.MaxValue,
- (v, a: Float) => math.min(v.data, a), // apply
+ (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
numIter,
- gatherDirection = EdgeDirection.In).vertices
+ gatherDirection = EdgeDirection.In)
}
- // /**
- // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
- // */
- // def dynamicPageRank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
- // tol: Float, maxIter: Int = 10) = {
- // graph.edges.cache
- // // Compute the out degree of each vertex
- // val outDegree = graph.edges.map { case (src, target, data) => (src, 1)}.reduceByKey(_ + _)
- // val vertices = graph.vertices.leftOuterJoin(outDegree).mapValues {
- // case (_, Some(deg)) => (deg, 1.0F, 1.0F)
- // case (_, None) => (0, 1.0F, 1.0F)
- // }.cache
-
- // val edges = graph.edges
- // val pageRankGraph = new Graph(vertices, edges)
- // pageRankGraph.numVPart = graph.numVPart
- // pageRankGraph.numEPart = graph.numEPart
-
- // // Run PageRank
- // pageRankGraph.iterateDynamic(
- // (me_id, edge) => edge.source.data._2 / edge.source.data._1, // gather
- // (a: Float, b: Float) => a + b, // merge
- // 0F,
- // (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a), vertex.data._2), // apply
- // (me_id, edge) => math.abs(edge.source.data._2 - edge.source.data._1) > tol, // scatter
- // maxIter).vertices.mapValues { case (degree, rank, oldRank) => rank }
- // // println("Computed graph: #edges: " + graph_ret.numEdges + " #vertices" + graph_ret.numVertices)
- // // graph_ret.vertices.take(10).foreach(println)
- // }
@@ -288,10 +279,10 @@ object Analytics {
val pr = Analytics.pagerank(graph, numIter)
// val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// else Analytics.pagerank(graph, numIter)
- println("Total rank: " + pr.map{ case Vertex(id,r) => r }.reduce(_+_) )
+ println("Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
if(!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname)
- pr.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop()
@@ -325,7 +316,7 @@ object Analytics {
val cc = Analytics.connectedComponents(graph, numIter)
// val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter)
- println("Components: " + cc.map(_.data).distinct())
+ println("Components: " + cc.vertices.map(_.data).distinct())
sc.stop()
}
@@ -368,7 +359,7 @@ object Analytics {
val sp = Analytics.shortestPath(graph, sources, numIter)
// val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// else Analytics.shortestPath(graph, sources, numIter)
- println("Longest Path: " + sp.map(_.data).reduce(math.max(_,_)))
+ println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
sc.stop()
}
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala
index d4fdb76040..fa69e1ad9b 100644
--- a/graph/src/main/scala/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/spark/graph/GraphLab.scala
@@ -6,35 +6,35 @@ import spark.RDD
object GraphLab {
- def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
- rawGraph: Graph[VD, ED])(
- gather: (Vid, EdgeWithVertices[VD, ED]) => A,
- merge: (A, A) => A,
- default: A,
- apply: (Vertex[VD], A) => VD,
- numIter: Int,
- gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
+ // def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
+ // rawGraph: Graph[VD, ED])(
+ // gather: (Vid, EdgeWithVertices[VD, ED]) => A,
+ // merge: (A, A) => A,
+ // default: A,
+ // apply: (Vertex[VD], A) => VD,
+ // numIter: Int,
+ // gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
- var graph = rawGraph.cache()
+ // var graph = rawGraph.cache()
- var i = 0
- while (i < numIter) {
+ // var i = 0
+ // while (i < numIter) {
- val accUpdates: RDD[(Vid, A)] =
- graph.mapReduceNeighborhood(gather, merge, default, gatherDirection)
+ // val accUpdates: RDD[(Vid, A)] =
+ // graph.mapReduceNeighborhood(gather, merge, default, gatherDirection)
- def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
- graph = graph.updateVertices(accUpdates, applyFunc).cache()
+ // def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
+ // graph = graph.updateVertices(accUpdates, applyFunc).cache()
- i += 1
- }
- graph
- }
+ // i += 1
+ // }
+ // graph
+ // }
- def iterateGASOption[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
+ def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])(
gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A,
@@ -60,7 +60,7 @@ object GraphLab {
- def iterateDynamic[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
+ def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])(
rawGather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A,