aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala6
-rw-r--r--graph/src/main/scala/spark/graph/Analytics.scala16
-rw-r--r--graph/src/main/scala/spark/graph/BagelTest.scala71
-rw-r--r--graph/src/main/scala/spark/graph/GraphLab.scala47
-rw-r--r--project/SparkBuild.scala2
5 files changed, 112 insertions, 30 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala b/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala
index b97d786ed4..df63000150 100644
--- a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala
@@ -13,16 +13,16 @@ import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
import com.esotericsoftware.kryo._
class PageRankUtils extends Serializable {
- def computeWithCombiner(numVertices: Long, epsilon: Double)(
+ def computeWithCombiner(numVertices: Long, epsilon: Double, terminateSteps: Int = 10)(
self: PRVertex, messageSum: Option[Double], superstep: Int
): (PRVertex, Array[PRMessage]) = {
val newValue = messageSum match {
case Some(msgSum) if msgSum != 0 =>
- 0.15 / numVertices + 0.85 * msgSum
+ 0.15 + 0.85 * msgSum
case _ => self.value
}
- val terminate = superstep >= 10
+ val terminate = superstep >= terminateSteps
val outbox: Array[PRMessage] =
if (!terminate)
diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala
index 8f2844f34f..4a7449ba4f 100644
--- a/graph/src/main/scala/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/spark/graph/Analytics.scala
@@ -10,15 +10,27 @@ object Analytics {
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
+ // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
+ // // Compute the out degree of each vertex
+ // val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
+ // (vertex, deg) => (deg.getOrElse(0), 1.0F)
+ // )
+ // GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
+ // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
+ // (a: Float, b: Float) => a + b, // merge
+ // (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
+ // numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
+ // }
def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0F)
)
- GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
+ GraphLab.iterateGA2[(Int, Float), ED, Float](pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Float, b: Float) => a + b, // merge
- (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
+ 0.0F, // default
+ (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
}
diff --git a/graph/src/main/scala/spark/graph/BagelTest.scala b/graph/src/main/scala/spark/graph/BagelTest.scala
new file mode 100644
index 0000000000..eee53bd6f6
--- /dev/null
+++ b/graph/src/main/scala/spark/graph/BagelTest.scala
@@ -0,0 +1,71 @@
+package spark.graph
+
+import spark._
+import spark.SparkContext._
+import spark.bagel.Bagel
+import spark.bagel.examples._
+
+
+object BagelTest {
+
+ def main(args: Array[String]) {
+ val host = args(0)
+ val taskType = args(1)
+ val fname = args(2)
+ val options = args.drop(3).map { arg =>
+ arg.dropWhile(_ == '-').split('=') match {
+ case Array(opt, v) => (opt -> v)
+ case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ }
+ }
+
+ System.setProperty("spark.serializer", "spark.KryoSerializer")
+ //System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator")
+
+ var numIter = Int.MaxValue
+ var isDynamic = false
+ var tol:Float = 0.001F
+ var outFname = ""
+ var numVPart = 4
+ var numEPart = 4
+
+ options.foreach{
+ case ("numIter", v) => numIter = v.toInt
+ case ("dynamic", v) => isDynamic = v.toBoolean
+ case ("tol", v) => tol = v.toFloat
+ case ("output", v) => outFname = v
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("numEPart", v) => numEPart = v.toInt
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ val sc = new SparkContext(host, "PageRank(" + fname + ")")
+ val g = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
+ val startTime = System.currentTimeMillis
+
+ val numVertices = g.vertices.count()
+
+ val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
+ (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
+ }
+
+ // Do the computation
+ val epsilon = 0.01 / numVertices
+ val messages = sc.parallelize(Array[(String, PRMessage)]())
+ val utils = new PageRankUtils
+ val result =
+ Bagel.run(
+ sc, vertices, messages, combiner = new PRCombiner(),
+ numPartitions = numVPart)(
+ utils.computeWithCombiner(numVertices, epsilon, numIter))
+
+ println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
+ if (!outFname.isEmpty) {
+ println("Saving pageranks of pages to " + outFname)
+ result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
+ }
+ println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+ sc.stop()
+ }
+}
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala
index b0efdadce9..4de453663d 100644
--- a/graph/src/main/scala/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/spark/graph/GraphLab.scala
@@ -6,30 +6,29 @@ import spark.RDD
object GraphLab {
- // def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
- // rawGraph: Graph[VD, ED])(
- // gather: (Vid, EdgeWithVertices[VD, ED]) => A,
- // merge: (A, A) => A,
- // default: A,
- // apply: (Vertex[VD], A) => VD,
- // numIter: Int,
- // gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
-
- // var graph = rawGraph.cache()
-
- // var i = 0
- // while (i < numIter) {
-
- // val accUpdates: RDD[(Vid, A)] =
- // graph.mapReduceNeighborhood(gather, merge, default, gatherDirection)
-
- // def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
- // graph = graph.updateVertices(accUpdates, applyFunc).cache()
-
- // i += 1
- // }
- // graph
- // }
+ def iterateGA2[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
+ gather: (Vid, EdgeWithVertices[VD, ED]) => A,
+ merge: (A, A) => A,
+ default: A,
+ apply: (Vertex[VD], A) => VD,
+ numIter: Int,
+ gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
+
+ var g = graph.cache()
+
+ var i = 0
+ while (i < numIter) {
+
+ val accUpdates: RDD[(Vid, A)] =
+ g.mapReduceNeighborhood(gather, merge, default, gatherDirection)
+
+ def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
+ g = g.updateVertices(accUpdates, applyFunc).cache()
+
+ i += 1
+ }
+ g
+ }
def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A,
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6e6c72517a..56610e4385 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -27,7 +27,7 @@ object SparkBuild extends Build {
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core)
- lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn (core)
+ lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn (core, bagel)
lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn (core)