diff options
author | Andy Konwinski <andyk@berkeley.edu> | 2012-09-02 23:05:40 -0700 |
---|---|---|
committer | Andy Konwinski <andyk@berkeley.edu> | 2012-09-12 13:03:43 -0700 |
commit | 16da942d66ad3d460889ffcb08ee8c82b1ea7936 (patch) | |
tree | d49349d1376fb070950473658a75a33cf51631e6 /docs/bagel-programming-guide.md | |
parent | a29ac5f9cf3b63cdb0bdd864dc0fea3d3d8db095 (diff) | |
download | spark-16da942d66ad3d460889ffcb08ee8c82b1ea7936.tar.gz spark-16da942d66ad3d460889ffcb08ee8c82b1ea7936.tar.bz2 spark-16da942d66ad3d460889ffcb08ee8c82b1ea7936.zip |
Adding docs directory containing documentation currently on the wiki
which can be compiled via jekyll, using the command `jekyll`. To compile
and run a local webserver to serve the doc as a website, run
`jekyll --server`.
Diffstat (limited to 'docs/bagel-programming-guide.md')
-rw-r--r-- | docs/bagel-programming-guide.md | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md new file mode 100644 index 0000000000..d4d08f8cb1 --- /dev/null +++ b/docs/bagel-programming-guide.md @@ -0,0 +1,144 @@ +--- +layout: global +title: Bagel Programming Guide +--- +# Bagel Programming Guide + +**Bagel** is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators. + +In the Pregel programming model, jobs run as a sequence of iterations called _supersteps_. In each superstep, each vertex in the graph runs a user-specified function that can update state associated with the vertex and send messages to other vertices for use in the *next* iteration. + +This guide shows the programming model and features of Bagel by walking through an example implementation of PageRank on Bagel. + +## Linking with Bagel + +To write a Bagel application, you will need to add Spark, its dependencies, and Bagel to your CLASSPATH: + +1. Run `sbt/sbt update` to fetch Spark's dependencies, if you haven't already done so. +2. Run `sbt/sbt assembly` to build Spark and its dependencies into one JAR (`core/target/scala_2.8.1/Spark Core-assembly-0.3-SNAPSHOT.jar`) and Bagel into a second JAR (`bagel/target/scala_2.8.1/Bagel-assembly-0.3-SNAPSHOT.jar`). +3. Add these two JARs to your CLASSPATH. + +## Programming Model + +Bagel operates on a graph represented as a [[distributed dataset|Spark Programming Guide]] of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages. + +For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to. + +We first extend the default `Vertex` class to store a `Double` +representing the current PageRank of the vertex, and similarly extend +the `Message` and `Edge` classes. Note that these need to be marked `@serializable` to allow Spark to transfer them across machines. We also import the Bagel types and implicit conversions. + + import spark.bagel._ + import spark.bagel.Bagel._ + + @serializable class PREdge(val targetId: String) extends Edge + + @serializable class PRVertex( + val id: String, val rank: Double, val outEdges: Seq[Edge], + val active: Boolean) extends Vertex + + @serializable class PRMessage( + val targetId: String, val rankShare: Double) extends Message + +Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it. + + val input = sc.textFile("pagerank_data.txt") + + val numVerts = input.count() + + val verts = input.map(line => { + val fields = line.split('\t') + val (id, linksStr) = (fields(0), fields(1)) + val links = linksStr.split(',').map(new PREdge(_)) + (id, new PRVertex(id, 1.0 / numVerts, links, true)) + }).cache + +We run the Bagel job, passing in `verts`, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations. + + val emptyMsgs = sc.parallelize(List[(String, PRMessage)]()) + + def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int) + : (PRVertex, Iterable[PRMessage]) = { + val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum + val newRank = + if (msgSum != 0) + 0.15 / numVerts + 0.85 * msgSum + else + self.rank + val halt = superstep >= 10 + val msgsOut = + if (!halt) + self.outEdges.map(edge => + new PRMessage(edge.targetId, newRank / self.outEdges.size)) + else + List() + (new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut) + } + + val result = Bagel.run(sc, verts, emptyMsgs)()(compute) + +Finally, we print the results. + + println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString) + +### Combiners + +Sending a message to another vertex generally involves expensive communication over the network. For certain algorithms, it's possible to reduce the amount of communication using _combiners_. For example, if the compute function receives integer messages and only uses their sum, it's possible for Bagel to combine multiple messages to the same vertex by summing them. + +For combiner support, Bagel can optionally take a set of combiner functions that convert messages to their combined form. + +_Example: PageRank with combiners_ + +### Aggregators + +Aggregators perform a reduce across all vertices after each superstep, and provide the result to each vertex in the next superstep. + +For aggregator support, Bagel can optionally take an aggregator function that reduces across each vertex. + +_Example_ + +### Operations + +Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/mesos/spark/blob/master/bagel/src/main/scala/spark/bagel/Bagel.scala) for details. + +#### Actions + + # Full form + Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute) + where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int) + and returns (newVertex: V, outMessages: Array[M]) + # Abbreviated forms + Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute) + where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int) + and returns (newVertex: V, outMessages: Array[M]) + Bagel.run(sc, vertices, messages, combiner, numSplits)(compute) + where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int) + and returns (newVertex: V, outMessages: Array[M]) + Bagel.run(sc, vertices, messages, numSplits)(compute) + where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int) + and returns (newVertex: V, outMessages: Array[M]) + +#### Types + + trait Combiner[M, C] { + def createCombiner(msg: M): C + def mergeMsg(combiner: C, msg: M): C + def mergeCombiners(a: C, b: C): C + } + + trait Aggregator[V, A] { + def createAggregator(vert: V): A + def mergeAggregators(a: A, b: A): A + } + + trait Vertex { + def active: Boolean + } + + trait Message[K] { + def targetId: K + } + +## Where to Go from Here + +Two example jobs, PageRank and shortest path, are included in `bagel/src/main/scala/spark/bagel/examples`. You can run them by passing the class name to the `run` script included in Spark -- for example, `./run spark.bagel.examples.ShortestPath`. Each example program prints usage help when run without any arguments. |