aboutsummaryrefslogtreecommitdiff
path: root/docs/bagel-programming-guide.md
diff options
context:
space:
mode:
authorAndy Konwinski <andyk@berkeley.edu>2012-09-12 23:05:47 -0700
committerAndy Konwinski <andyk@berkeley.edu>2012-09-12 23:25:07 -0700
commitca2c999e0fd97a29b20bd3990b6e57d9e0db5d0a (patch)
treef90eb6c5cd2bc2a342490d305677f90f7e936c0f /docs/bagel-programming-guide.md
parentc4db09ea76802df22f52826e228f9d15c0cf13d9 (diff)
downloadspark-ca2c999e0fd97a29b20bd3990b6e57d9e0db5d0a.tar.gz
spark-ca2c999e0fd97a29b20bd3990b6e57d9e0db5d0a.tar.bz2
spark-ca2c999e0fd97a29b20bd3990b6e57d9e0db5d0a.zip
Making the link to api scaladocs work and migrating other code snippets
to use pygments syntax highlighting.
Diffstat (limited to 'docs/bagel-programming-guide.md')
-rw-r--r--docs/bagel-programming-guide.md163
1 files changed, 90 insertions, 73 deletions
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index 3f6ab7df96..b133376a97 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -27,58 +27,66 @@ We first extend the default `Vertex` class to store a `Double`
representing the current PageRank of the vertex, and similarly extend
the `Message` and `Edge` classes. Note that these need to be marked `@serializable` to allow Spark to transfer them across machines. We also import the Bagel types and implicit conversions.
- import spark.bagel._
- import spark.bagel.Bagel._
-
- @serializable class PREdge(val targetId: String) extends Edge
-
- @serializable class PRVertex(
- val id: String, val rank: Double, val outEdges: Seq[Edge],
- val active: Boolean) extends Vertex
-
- @serializable class PRMessage(
- val targetId: String, val rankShare: Double) extends Message
+{% highlight scala %}
+import spark.bagel._
+import spark.bagel.Bagel._
+
+@serializable class PREdge(val targetId: String) extends Edge
+
+@serializable class PRVertex(
+ val id: String, val rank: Double, val outEdges: Seq[Edge],
+ val active: Boolean) extends Vertex
+
+@serializable class PRMessage(
+ val targetId: String, val rankShare: Double) extends Message
+{% endhighlight %}
Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it.
- val input = sc.textFile("pagerank_data.txt")
+{% highlight scala %}
+val input = sc.textFile("pagerank_data.txt")
- val numVerts = input.count()
+val numVerts = input.count()
- val verts = input.map(line => {
- val fields = line.split('\t')
- val (id, linksStr) = (fields(0), fields(1))
- val links = linksStr.split(',').map(new PREdge(_))
- (id, new PRVertex(id, 1.0 / numVerts, links, true))
- }).cache
+val verts = input.map(line => {
+ val fields = line.split('\t')
+ val (id, linksStr) = (fields(0), fields(1))
+ val links = linksStr.split(',').map(new PREdge(_))
+ (id, new PRVertex(id, 1.0 / numVerts, links, true))
+}).cache
+{% endhighlight %}
We run the Bagel job, passing in `verts`, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations.
- val emptyMsgs = sc.parallelize(List[(String, PRMessage)]())
-
- def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int)
- : (PRVertex, Iterable[PRMessage]) = {
- val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum
- val newRank =
- if (msgSum != 0)
- 0.15 / numVerts + 0.85 * msgSum
- else
- self.rank
- val halt = superstep >= 10
- val msgsOut =
- if (!halt)
- self.outEdges.map(edge =>
- new PRMessage(edge.targetId, newRank / self.outEdges.size))
- else
- List()
- (new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut)
- }
-
- val result = Bagel.run(sc, verts, emptyMsgs)()(compute)
+{% highlight scala %}
+val emptyMsgs = sc.parallelize(List[(String, PRMessage)]())
+
+def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int)
+: (PRVertex, Iterable[PRMessage]) = {
+ val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum
+ val newRank =
+ if (msgSum != 0)
+ 0.15 / numVerts + 0.85 * msgSum
+ else
+ self.rank
+ val halt = superstep >= 10
+ val msgsOut =
+ if (!halt)
+ self.outEdges.map(edge =>
+ new PRMessage(edge.targetId, newRank / self.outEdges.size))
+ else
+ List()
+ (new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut)
+}
+{% endhighlight %}
+
+val result = Bagel.run(sc, verts, emptyMsgs)()(compute)
Finally, we print the results.
- println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString)
+{% highlight scala %}
+println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString)
+{% endhighlight %}
### Combiners
@@ -102,41 +110,50 @@ Here are the actions and types in the Bagel API. See [Bagel.scala](https://githu
#### Actions
- # Full form
- Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute)
- where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)
- and returns (newVertex: V, outMessages: Array[M])
- # Abbreviated forms
- Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute)
- where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
- and returns (newVertex: V, outMessages: Array[M])
- Bagel.run(sc, vertices, messages, combiner, numSplits)(compute)
- where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
- and returns (newVertex: V, outMessages: Array[M])
- Bagel.run(sc, vertices, messages, numSplits)(compute)
- where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int)
- and returns (newVertex: V, outMessages: Array[M])
+{% highlight scala %}
+/*** Full form ***/
+
+Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute)
+// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)
+// and returns (newVertex: V, outMessages: Array[M])
+
+/*** Abbreviated forms ***/
+
+Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute)
+// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
+// and returns (newVertex: V, outMessages: Array[M])
+
+Bagel.run(sc, vertices, messages, combiner, numSplits)(compute)
+// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
+// and returns (newVertex: V, outMessages: Array[M])
+
+Bagel.run(sc, vertices, messages, numSplits)(compute)
+// where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int)
+// and returns (newVertex: V, outMessages: Array[M])
+{% endhighlight %}
#### Types
- trait Combiner[M, C] {
- def createCombiner(msg: M): C
- def mergeMsg(combiner: C, msg: M): C
- def mergeCombiners(a: C, b: C): C
- }
-
- trait Aggregator[V, A] {
- def createAggregator(vert: V): A
- def mergeAggregators(a: A, b: A): A
- }
-
- trait Vertex {
- def active: Boolean
- }
-
- trait Message[K] {
- def targetId: K
- }
+{% highlight scala %}
+trait Combiner[M, C] {
+ def createCombiner(msg: M): C
+ def mergeMsg(combiner: C, msg: M): C
+ def mergeCombiners(a: C, b: C): C
+}
+
+trait Aggregator[V, A] {
+ def createAggregator(vert: V): A
+ def mergeAggregators(a: A, b: A): A
+}
+
+trait Vertex {
+ def active: Boolean
+}
+
+trait Message[K] {
+ def targetId: K
+}
+{% endhighlight %}
## Where to Go from Here