aboutsummaryrefslogtreecommitdiff
path: root/docs/bagel-programming-guide.md
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-24 23:21:00 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-24 23:21:00 -0700
commit863a55ae42c2b9c0583b77cf37ff13bd2459f82b (patch)
treef4b18ebe461343ffb864dabb6afefcdf88dfafaf /docs/bagel-programming-guide.md
parented71df46cddc9a4f1363b937c10bfa2a928e564c (diff)
parentf63a40fd99bf907c03cd44585fd5979bf21b304d (diff)
downloadspark-863a55ae42c2b9c0583b77cf37ff13bd2459f82b.tar.gz
spark-863a55ae42c2b9c0583b77cf37ff13bd2459f82b.tar.bz2
spark-863a55ae42c2b9c0583b77cf37ff13bd2459f82b.zip
Merge remote-tracking branch 'public/master' into dev
Conflicts: core/src/main/scala/spark/BlockStoreShuffleFetcher.scala core/src/main/scala/spark/KryoSerializer.scala core/src/main/scala/spark/MapOutputTracker.scala core/src/main/scala/spark/RDD.scala core/src/main/scala/spark/SparkContext.scala core/src/main/scala/spark/executor/Executor.scala core/src/main/scala/spark/network/Connection.scala core/src/main/scala/spark/network/ConnectionManagerTest.scala core/src/main/scala/spark/rdd/BlockRDD.scala core/src/main/scala/spark/rdd/NewHadoopRDD.scala core/src/main/scala/spark/scheduler/ShuffleMapTask.scala core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala core/src/main/scala/spark/storage/BlockManager.scala core/src/main/scala/spark/storage/BlockMessage.scala core/src/main/scala/spark/storage/BlockStore.scala core/src/main/scala/spark/storage/StorageLevel.scala core/src/main/scala/spark/util/AkkaUtils.scala project/SparkBuild.scala run
Diffstat (limited to 'docs/bagel-programming-guide.md')
-rw-r--r--docs/bagel-programming-guide.md161
1 files changed, 161 insertions, 0 deletions
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
new file mode 100644
index 0000000000..8a0fa42d94
--- /dev/null
+++ b/docs/bagel-programming-guide.md
@@ -0,0 +1,161 @@
+---
+layout: global
+title: Bagel Programming Guide
+---
+
+**Bagel** is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators.
+
+In the Pregel programming model, jobs run as a sequence of iterations called _supersteps_. In each superstep, each vertex in the graph runs a user-specified function that can update state associated with the vertex and send messages to other vertices for use in the *next* iteration.
+
+This guide shows the programming model and features of Bagel by walking through an example implementation of PageRank on Bagel.
+
+## Linking with Bagel
+
+To write a Bagel application, you will need to add Spark, its dependencies, and Bagel to your CLASSPATH:
+
+1. Run `sbt/sbt update` to fetch Spark's dependencies, if you haven't already done so.
+2. Run `sbt/sbt assembly` to build Spark and its dependencies into one JAR (`core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar`)
+3. Run `sbt/sbt package` build the Bagel JAR (`bagel/target/scala_{{site.SCALA_VERSION}}/spark-bagel_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar`).
+4. Add these two JARs to your CLASSPATH.
+
+## Programming Model
+
+Bagel operates on a graph represented as a [distributed dataset](scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
+
+For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.
+
+We first extend the default `Vertex` class to store a `Double`
+representing the current PageRank of the vertex, and similarly extend
+the `Message` and `Edge` classes. Note that these need to be marked `@serializable` to allow Spark to transfer them across machines. We also import the Bagel types and implicit conversions.
+
+{% highlight scala %}
+import spark.bagel._
+import spark.bagel.Bagel._
+
+@serializable class PREdge(val targetId: String) extends Edge
+
+@serializable class PRVertex(
+ val id: String, val rank: Double, val outEdges: Seq[Edge],
+ val active: Boolean) extends Vertex
+
+@serializable class PRMessage(
+ val targetId: String, val rankShare: Double) extends Message
+{% endhighlight %}
+
+Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it.
+
+{% highlight scala %}
+val input = sc.textFile("pagerank_data.txt")
+
+val numVerts = input.count()
+
+val verts = input.map(line => {
+ val fields = line.split('\t')
+ val (id, linksStr) = (fields(0), fields(1))
+ val links = linksStr.split(',').map(new PREdge(_))
+ (id, new PRVertex(id, 1.0 / numVerts, links, true))
+}).cache
+{% endhighlight %}
+
+We run the Bagel job, passing in `verts`, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations.
+
+{% highlight scala %}
+val emptyMsgs = sc.parallelize(List[(String, PRMessage)]())
+
+def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int)
+: (PRVertex, Iterable[PRMessage]) = {
+ val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum
+ val newRank =
+ if (msgSum != 0)
+ 0.15 / numVerts + 0.85 * msgSum
+ else
+ self.rank
+ val halt = superstep >= 10
+ val msgsOut =
+ if (!halt)
+ self.outEdges.map(edge =>
+ new PRMessage(edge.targetId, newRank / self.outEdges.size))
+ else
+ List()
+ (new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut)
+}
+{% endhighlight %}
+
+val result = Bagel.run(sc, verts, emptyMsgs)()(compute)
+
+Finally, we print the results.
+
+{% highlight scala %}
+println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString)
+{% endhighlight %}
+
+### Combiners
+
+Sending a message to another vertex generally involves expensive communication over the network. For certain algorithms, it's possible to reduce the amount of communication using _combiners_. For example, if the compute function receives integer messages and only uses their sum, it's possible for Bagel to combine multiple messages to the same vertex by summing them.
+
+For combiner support, Bagel can optionally take a set of combiner functions that convert messages to their combined form.
+
+_Example: PageRank with combiners_
+
+### Aggregators
+
+Aggregators perform a reduce across all vertices after each superstep, and provide the result to each vertex in the next superstep.
+
+For aggregator support, Bagel can optionally take an aggregator function that reduces across each vertex.
+
+_Example_
+
+### Operations
+
+Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/mesos/spark/blob/master/bagel/src/main/scala/spark/bagel/Bagel.scala) for details.
+
+#### Actions
+
+{% highlight scala %}
+/*** Full form ***/
+
+Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute)
+// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)
+// and returns (newVertex: V, outMessages: Array[M])
+
+/*** Abbreviated forms ***/
+
+Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute)
+// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
+// and returns (newVertex: V, outMessages: Array[M])
+
+Bagel.run(sc, vertices, messages, combiner, numSplits)(compute)
+// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
+// and returns (newVertex: V, outMessages: Array[M])
+
+Bagel.run(sc, vertices, messages, numSplits)(compute)
+// where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int)
+// and returns (newVertex: V, outMessages: Array[M])
+{% endhighlight %}
+
+#### Types
+
+{% highlight scala %}
+trait Combiner[M, C] {
+ def createCombiner(msg: M): C
+ def mergeMsg(combiner: C, msg: M): C
+ def mergeCombiners(a: C, b: C): C
+}
+
+trait Aggregator[V, A] {
+ def createAggregator(vert: V): A
+ def mergeAggregators(a: A, b: A): A
+}
+
+trait Vertex {
+ def active: Boolean
+}
+
+trait Message[K] {
+ def targetId: K
+}
+{% endhighlight %}
+
+## Where to Go from Here
+
+Two example jobs, PageRank and shortest path, are included in `bagel/src/main/scala/spark/bagel/examples`. You can run them by passing the class name to the `run` script included in Spark -- for example, `./run spark.bagel.examples.WikipediaPageRank`. Each example program prints usage help when run without any arguments.