diff options
-rw-r--r-- | bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala | 6 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/Analytics.scala | 16 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/BagelTest.scala | 71 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/GraphLab.scala | 47 | ||||
-rw-r--r-- | project/SparkBuild.scala | 2 |
5 files changed, 112 insertions, 30 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala b/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala index b97d786ed4..df63000150 100644 --- a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala +++ b/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala @@ -13,16 +13,16 @@ import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} import com.esotericsoftware.kryo._ class PageRankUtils extends Serializable { - def computeWithCombiner(numVertices: Long, epsilon: Double)( + def computeWithCombiner(numVertices: Long, epsilon: Double, terminateSteps: Int = 10)( self: PRVertex, messageSum: Option[Double], superstep: Int ): (PRVertex, Array[PRMessage]) = { val newValue = messageSum match { case Some(msgSum) if msgSum != 0 => - 0.15 / numVertices + 0.85 * msgSum + 0.15 + 0.85 * msgSum case _ => self.value } - val terminate = superstep >= 10 + val terminate = superstep >= terminateSteps val outbox: Array[PRMessage] = if (!terminate) diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index 8f2844f34f..4a7449ba4f 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -10,15 +10,27 @@ object Analytics { /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ + // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { + // // Compute the out degree of each vertex + // val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, + // (vertex, deg) => (deg.getOrElse(0), 1.0F) + // ) + // GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)( + // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather + // (a: Float, b: Float) => a + b, // merge + // (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply + // numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } + // } def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { // Compute the out degree of each vertex val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, (vertex, deg) => (deg.getOrElse(0), 1.0F) ) - GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)( + GraphLab.iterateGA2[(Int, Float), ED, Float](pagerankGraph)( (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather (a: Float, b: Float) => a + b, // merge - (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply + 0.0F, // default + (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } } diff --git a/graph/src/main/scala/spark/graph/BagelTest.scala b/graph/src/main/scala/spark/graph/BagelTest.scala new file mode 100644 index 0000000000..eee53bd6f6 --- /dev/null +++ b/graph/src/main/scala/spark/graph/BagelTest.scala @@ -0,0 +1,71 @@ +package spark.graph + +import spark._ +import spark.SparkContext._ +import spark.bagel.Bagel +import spark.bagel.examples._ + + +object BagelTest { + + def main(args: Array[String]) { + val host = args(0) + val taskType = args(1) + val fname = args(2) + val options = args.drop(3).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + System.setProperty("spark.serializer", "spark.KryoSerializer") + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator") + + var numIter = Int.MaxValue + var isDynamic = false + var tol:Float = 0.001F + var outFname = "" + var numVPart = 4 + var numEPart = 4 + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("tol", v) => tol = v.toFloat + case ("output", v) => outFname = v + case ("numVPart", v) => numVPart = v.toInt + case ("numEPart", v) => numEPart = v.toInt + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + val sc = new SparkContext(host, "PageRank(" + fname + ")") + val g = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() + val startTime = System.currentTimeMillis + + val numVertices = g.vertices.count() + + val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => + (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) + } + + // Do the computation + val epsilon = 0.01 / numVertices + val messages = sc.parallelize(Array[(String, PRMessage)]()) + val utils = new PageRankUtils + val result = + Bagel.run( + sc, vertices, messages, combiner = new PRCombiner(), + numPartitions = numVPart)( + utils.computeWithCombiner(numVertices, epsilon, numIter)) + + println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) + if (!outFname.isEmpty) { + println("Saving pageranks of pages to " + outFname) + result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) + } + println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + sc.stop() + } +} diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index b0efdadce9..4de453663d 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -6,30 +6,29 @@ import spark.RDD object GraphLab { - // def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( - // rawGraph: Graph[VD, ED])( - // gather: (Vid, EdgeWithVertices[VD, ED]) => A, - // merge: (A, A) => A, - // default: A, - // apply: (Vertex[VD], A) => VD, - // numIter: Int, - // gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { - - // var graph = rawGraph.cache() - - // var i = 0 - // while (i < numIter) { - - // val accUpdates: RDD[(Vid, A)] = - // graph.mapReduceNeighborhood(gather, merge, default, gatherDirection) - - // def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } - // graph = graph.updateVertices(accUpdates, applyFunc).cache() - - // i += 1 - // } - // graph - // } + def iterateGA2[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + gather: (Vid, EdgeWithVertices[VD, ED]) => A, + merge: (A, A) => A, + default: A, + apply: (Vertex[VD], A) => VD, + numIter: Int, + gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { + + var g = graph.cache() + + var i = 0 + while (i < numIter) { + + val accUpdates: RDD[(Vid, A)] = + g.mapReduceNeighborhood(gather, merge, default, gatherDirection) + + def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } + g = g.updateVertices(accUpdates, applyFunc).cache() + + i += 1 + } + g + } def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6e6c72517a..56610e4385 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -27,7 +27,7 @@ object SparkBuild extends Build { lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core) - lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn (core) + lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn (core, bagel) lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn (core) |