diff options
Diffstat (limited to 'docs/bagel-programming-guide.md')
-rw-r--r-- | docs/bagel-programming-guide.md | 163 |
1 files changed, 90 insertions, 73 deletions
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md index 3f6ab7df96..b133376a97 100644 --- a/docs/bagel-programming-guide.md +++ b/docs/bagel-programming-guide.md @@ -27,58 +27,66 @@ We first extend the default `Vertex` class to store a `Double` representing the current PageRank of the vertex, and similarly extend the `Message` and `Edge` classes. Note that these need to be marked `@serializable` to allow Spark to transfer them across machines. We also import the Bagel types and implicit conversions. - import spark.bagel._ - import spark.bagel.Bagel._ - - @serializable class PREdge(val targetId: String) extends Edge - - @serializable class PRVertex( - val id: String, val rank: Double, val outEdges: Seq[Edge], - val active: Boolean) extends Vertex - - @serializable class PRMessage( - val targetId: String, val rankShare: Double) extends Message +{% highlight scala %} +import spark.bagel._ +import spark.bagel.Bagel._ + +@serializable class PREdge(val targetId: String) extends Edge + +@serializable class PRVertex( + val id: String, val rank: Double, val outEdges: Seq[Edge], + val active: Boolean) extends Vertex + +@serializable class PRMessage( + val targetId: String, val rankShare: Double) extends Message +{% endhighlight %} Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it. - val input = sc.textFile("pagerank_data.txt") +{% highlight scala %} +val input = sc.textFile("pagerank_data.txt") - val numVerts = input.count() +val numVerts = input.count() - val verts = input.map(line => { - val fields = line.split('\t') - val (id, linksStr) = (fields(0), fields(1)) - val links = linksStr.split(',').map(new PREdge(_)) - (id, new PRVertex(id, 1.0 / numVerts, links, true)) - }).cache +val verts = input.map(line => { + val fields = line.split('\t') + val (id, linksStr) = (fields(0), fields(1)) + val links = linksStr.split(',').map(new PREdge(_)) + (id, new PRVertex(id, 1.0 / numVerts, links, true)) +}).cache +{% endhighlight %} We run the Bagel job, passing in `verts`, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations. - val emptyMsgs = sc.parallelize(List[(String, PRMessage)]()) - - def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int) - : (PRVertex, Iterable[PRMessage]) = { - val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum - val newRank = - if (msgSum != 0) - 0.15 / numVerts + 0.85 * msgSum - else - self.rank - val halt = superstep >= 10 - val msgsOut = - if (!halt) - self.outEdges.map(edge => - new PRMessage(edge.targetId, newRank / self.outEdges.size)) - else - List() - (new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut) - } - - val result = Bagel.run(sc, verts, emptyMsgs)()(compute) +{% highlight scala %} +val emptyMsgs = sc.parallelize(List[(String, PRMessage)]()) + +def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int) +: (PRVertex, Iterable[PRMessage]) = { + val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum + val newRank = + if (msgSum != 0) + 0.15 / numVerts + 0.85 * msgSum + else + self.rank + val halt = superstep >= 10 + val msgsOut = + if (!halt) + self.outEdges.map(edge => + new PRMessage(edge.targetId, newRank / self.outEdges.size)) + else + List() + (new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut) +} +{% endhighlight %} + +val result = Bagel.run(sc, verts, emptyMsgs)()(compute) Finally, we print the results. - println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString) +{% highlight scala %} +println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString) +{% endhighlight %} ### Combiners @@ -102,41 +110,50 @@ Here are the actions and types in the Bagel API. See [Bagel.scala](https://githu #### Actions - # Full form - Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute) - where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int) - and returns (newVertex: V, outMessages: Array[M]) - # Abbreviated forms - Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute) - where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int) - and returns (newVertex: V, outMessages: Array[M]) - Bagel.run(sc, vertices, messages, combiner, numSplits)(compute) - where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int) - and returns (newVertex: V, outMessages: Array[M]) - Bagel.run(sc, vertices, messages, numSplits)(compute) - where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int) - and returns (newVertex: V, outMessages: Array[M]) +{% highlight scala %} +/*** Full form ***/ + +Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute) +// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int) +// and returns (newVertex: V, outMessages: Array[M]) + +/*** Abbreviated forms ***/ + +Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute) +// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int) +// and returns (newVertex: V, outMessages: Array[M]) + +Bagel.run(sc, vertices, messages, combiner, numSplits)(compute) +// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int) +// and returns (newVertex: V, outMessages: Array[M]) + +Bagel.run(sc, vertices, messages, numSplits)(compute) +// where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int) +// and returns (newVertex: V, outMessages: Array[M]) +{% endhighlight %} #### Types - trait Combiner[M, C] { - def createCombiner(msg: M): C - def mergeMsg(combiner: C, msg: M): C - def mergeCombiners(a: C, b: C): C - } - - trait Aggregator[V, A] { - def createAggregator(vert: V): A - def mergeAggregators(a: A, b: A): A - } - - trait Vertex { - def active: Boolean - } - - trait Message[K] { - def targetId: K - } +{% highlight scala %} +trait Combiner[M, C] { + def createCombiner(msg: M): C + def mergeMsg(combiner: C, msg: M): C + def mergeCombiners(a: C, b: C): C +} + +trait Aggregator[V, A] { + def createAggregator(vert: V): A + def mergeAggregators(a: A, b: A): A +} + +trait Vertex { + def active: Boolean +} + +trait Message[K] { + def targetId: K +} +{% endhighlight %} ## Where to Go from Here |