<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Bagel Programming Guide - Spark 1.0.1 Documentation</title>
<meta name="description" content="">
<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<meta name="viewport" content="width=device-width">
<link rel="stylesheet" href="css/bootstrap-responsive.min.css">
<link rel="stylesheet" href="css/main.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<!-- Google analytics script -->
<script type="text/javascript">
var _gaq = _gaq || [];
_gaq.push(['_setAccount', 'UA-32518208-1']);
_gaq.push(['_trackPageview']);
(function() {
var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
})();
</script>
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<div class="navbar navbar-fixed-top" id="topbar">
<div class="navbar-inner">
<div class="container">
<div class="brand"><a href="index.html">
<img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.0.1</span>
</div>
<ul class="nav">
<!--TODO(andyk): Add class="active" attribute to li some how.-->
<li><a href="index.html">Overview</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="programming-guide.html">Spark Programming Guide</a></li>
<li class="divider"></li>
<li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
<li><a href="sql-programming-guide.html">Spark SQL</a></li>
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="api/scala/index.html#org.apache.spark.package">Scaladoc</a></li>
<li><a href="api/java/index.html">Javadoc</a></li>
<li><a href="api/python/index.html">Python API</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="cluster-overview.html">Overview</a></li>
<li><a href="submitting-applications.html">Submitting Applications</a></li>
<li class="divider"></li>
<li><a href="ec2-scripts.html">Amazon EC2</a></li>
<li><a href="spark-standalone.html">Standalone Mode</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
<li><a href="running-on-yarn.html">YARN</a></li>
</ul>
</li>
<li class="dropdown">
<a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
<li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
<li><a href="job-scheduling.html">Job Scheduling</a></li>
<li><a href="security.html">Security</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li>
<li class="divider"></li>
<li><a href="building-with-maven.html">Building Spark with Maven</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
</ul>
</li>
</ul>
<!--<p class="navbar-text pull-right"><span class="version-text">v1.0.1</span></p>-->
</div>
</div>
</div>
<div class="container" id="content">
<h1 class="title">Bagel Programming Guide</h1>
<p><strong>Bagel will soon be superseded by <a href="graphx-programming-guide.html">GraphX</a>; we recommend that new users try GraphX instead.</strong></p>
<p>Bagel is a Spark implementation of Google’s <a href="http://portal.acm.org/citation.cfm?id=1807184">Pregel</a> graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators.</p>
<p>In the Pregel programming model, jobs run as a sequence of iterations called <em>supersteps</em>. In each superstep, each vertex in the graph runs a user-specified function that can update state associated with the vertex and send messages to other vertices for use in the <em>next</em> iteration.</p>
<p>This guide shows the programming model and features of Bagel by walking through an example implementation of PageRank on Bagel.</p>
<h1 id="linking-with-bagel">Linking with Bagel</h1>
<p>To use Bagel in your program, add the following SBT or Maven dependency:</p>
<pre><code>groupId = org.apache.spark
artifactId = spark-bagel_2.10
version = 1.0.1
</code></pre>
<h1 id="programming-model">Programming Model</h1>
<p>Bagel operates on a graph represented as a <a href="programming-guide.html">distributed dataset</a> of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.</p>
<p>For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.</p>
<p>We first extend the default <code>Vertex</code> class to store a <code>Double</code>
representing the current PageRank of the vertex, and similarly extend
the <code>Message</code> and <code>Edge</code> classes. Note that these need to be marked <code>@serializable</code> to allow Spark to transfer them across machines. We also import the Bagel types and implicit conversions.</p>
<div class="highlight"><pre><code class="scala"><span class="k">import</span> <span class="nn">org.apache.spark.bagel._</span>
<span class="k">import</span> <span class="nn">org.apache.spark.bagel.Bagel._</span>
<span class="nd">@serializable</span> <span class="k">class</span> <span class="nc">PREdge</span><span class="o">(</span><span class="k">val</span> <span class="n">targetId</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Edge</span>
<span class="nd">@serializable</span> <span class="k">class</span> <span class="nc">PRVertex</span><span class="o">(</span>
<span class="k">val</span> <span class="n">id</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">val</span> <span class="n">rank</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="k">val</span> <span class="n">outEdges</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Edge</span><span class="o">],</span>
<span class="k">val</span> <span class="n">active</span><span class="k">:</span> <span class="kt">Boolean</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Vertex</span>
<span class="nd">@serializable</span> <span class="k">class</span> <span class="nc">PRMessage</span><span class="o">(</span>
<span class="k">val</span> <span class="n">targetId</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">val</span> <span class="n">rankShare</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Message</span>
</code></pre></div>
<p>Next, we load a sample graph from a text file as a distributed dataset and package it into <code>PRVertex</code> objects. We also cache the distributed dataset because Bagel will use it multiple times and we’d like to avoid recomputing it.</p>
<div class="highlight"><pre><code class="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"data/pagerank_data.txt"</span><span class="o">)</span>
<span class="k">val</span> <span class="n">numVerts</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">count</span><span class="o">()</span>
<span class="k">val</span> <span class="n">verts</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">line</span> <span class="k">=></span> <span class="o">{</span>
<span class="k">val</span> <span class="n">fields</span> <span class="k">=</span> <span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="sc">'\t'</span><span class="o">)</span>
<span class="k">val</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">linksStr</span><span class="o">)</span> <span class="k">=</span> <span class="o">(</span><span class="n">fields</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">fields</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span>
<span class="k">val</span> <span class="n">links</span> <span class="k">=</span> <span class="n">linksStr</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="sc">','</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="k">new</span> <span class="nc">PREdge</span><span class="o">(</span><span class="k">_</span><span class="o">))</span>
<span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="k">new</span> <span class="nc">PRVertex</span><span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="mf">1.0</span> <span class="o">/</span> <span class="n">numVerts</span><span class="o">,</span> <span class="n">links</span><span class="o">,</span> <span class="kc">true</span><span class="o">))</span>
<span class="o">}).</span><span class="n">cache</span>
</code></pre></div>
<p>We run the Bagel job, passing in <code>verts</code>, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations.</p>
<div class="highlight"><pre><code class="scala"><span class="k">val</span> <span class="n">emptyMsgs</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span><span class="nc">List</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">PRMessage</span><span class="o">)]())</span>
<span class="k">def</span> <span class="n">compute</span><span class="o">(</span><span class="n">self</span><span class="k">:</span> <span class="kt">PRVertex</span><span class="o">,</span> <span class="n">msgs</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Seq</span><span class="o">[</span><span class="kt">PRMessage</span><span class="o">]],</span> <span class="n">superstep</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
<span class="k">:</span> <span class="o">(</span><span class="kt">PRVertex</span><span class="o">,</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">PRMessage</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">val</span> <span class="n">msgSum</span> <span class="k">=</span> <span class="n">msgs</span><span class="o">.</span><span class="n">getOrElse</span><span class="o">(</span><span class="nc">List</span><span class="o">()).</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">rankShare</span><span class="o">).</span><span class="n">sum</span>
<span class="k">val</span> <span class="n">newRank</span> <span class="k">=</span>
<span class="k">if</span> <span class="o">(</span><span class="n">msgSum</span> <span class="o">!=</span> <span class="mi">0</span><span class="o">)</span>
<span class="mf">0.15</span> <span class="o">/</span> <span class="n">numVerts</span> <span class="o">+</span> <span class="mf">0.85</span> <span class="o">*</span> <span class="n">msgSum</span>
<span class="k">else</span>
<span class="n">self</span><span class="o">.</span><span class="n">rank</span>
<span class="k">val</span> <span class="n">halt</span> <span class="k">=</span> <span class="n">superstep</span> <span class="o">>=</span> <span class="mi">10</span>
<span class="k">val</span> <span class="n">msgsOut</span> <span class="k">=</span>
<span class="k">if</span> <span class="o">(!</span><span class="n">halt</span><span class="o">)</span>
<span class="n">self</span><span class="o">.</span><span class="n">outEdges</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">edge</span> <span class="k">=></span>
<span class="k">new</span> <span class="nc">PRMessage</span><span class="o">(</span><span class="n">edge</span><span class="o">.</span><span class="n">targetId</span><span class="o">,</span> <span class="n">newRank</span> <span class="o">/</span> <span class="n">self</span><span class="o">.</span><span class="n">outEdges</span><span class="o">.</span><span class="n">size</span><span class="o">))</span>
<span class="k">else</span>
<span class="nc">List</span><span class="o">()</span>
<span class="o">(</span><span class="k">new</span> <span class="nc">PRVertex</span><span class="o">(</span><span class="n">self</span><span class="o">.</span><span class="n">id</span><span class="o">,</span> <span class="n">newRank</span><span class="o">,</span> <span class="n">self</span><span class="o">.</span><span class="n">outEdges</span><span class="o">,</span> <span class="o">!</span><span class="n">halt</span><span class="o">),</span> <span class="n">msgsOut</span><span class="o">)</span>
<span class="o">}</span>
</code></pre></div>
<p>val result = Bagel.run(sc, verts, emptyMsgs)()(compute)</p>
<p>Finally, we print the results.</p>
<div class="highlight"><pre><code class="scala"><span class="n">println</span><span class="o">(</span><span class="n">result</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">v</span> <span class="k">=></span> <span class="s">"%s\t%s\n"</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="n">v</span><span class="o">.</span><span class="n">id</span><span class="o">,</span> <span class="n">v</span><span class="o">.</span><span class="n">rank</span><span class="o">)).</span><span class="n">collect</span><span class="o">.</span><span class="n">mkString</span><span class="o">)</span>
</code></pre></div>
<h2 id="combiners">Combiners</h2>
<p>Sending a message to another vertex generally involves expensive communication over the network. For certain algorithms, it’s possible to reduce the amount of communication using <em>combiners</em>. For example, if the compute function receives integer messages and only uses their sum, it’s possible for Bagel to combine multiple messages to the same vertex by summing them.</p>
<p>For combiner support, Bagel can optionally take a set of combiner functions that convert messages to their combined form.</p>
<p><em>Example: PageRank with combiners</em></p>
<h2 id="aggregators">Aggregators</h2>
<p>Aggregators perform a reduce across all vertices after each superstep, and provide the result to each vertex in the next superstep.</p>
<p>For aggregator support, Bagel can optionally take an aggregator function that reduces across each vertex.</p>
<p><em>Example</em></p>
<h2 id="operations">Operations</h2>
<p>Here are the actions and types in the Bagel API. See <a href="https://github.com/apache/spark/blob/master/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala">Bagel.scala</a> for details.</p>
<h3 id="actions">Actions</h3>
<div class="highlight"><pre><code class="scala"><span class="cm">/*** Full form ***/</span>
<span class="nc">Bagel</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">vertices</span><span class="o">,</span> <span class="n">messages</span><span class="o">,</span> <span class="n">combiner</span><span class="o">,</span> <span class="n">aggregator</span><span class="o">,</span> <span class="n">partitioner</span><span class="o">,</span> <span class="n">numSplits</span><span class="o">)(</span><span class="n">compute</span><span class="o">)</span>
<span class="c1">// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)</span>
<span class="c1">// and returns (newVertex: V, outMessages: Array[M])</span>
<span class="cm">/*** Abbreviated forms ***/</span>
<span class="nc">Bagel</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">vertices</span><span class="o">,</span> <span class="n">messages</span><span class="o">,</span> <span class="n">combiner</span><span class="o">,</span> <span class="n">partitioner</span><span class="o">,</span> <span class="n">numSplits</span><span class="o">)(</span><span class="n">compute</span><span class="o">)</span>
<span class="c1">// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)</span>
<span class="c1">// and returns (newVertex: V, outMessages: Array[M])</span>
<span class="nc">Bagel</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">vertices</span><span class="o">,</span> <span class="n">messages</span><span class="o">,</span> <span class="n">combiner</span><span class="o">,</span> <span class="n">numSplits</span><span class="o">)(</span><span class="n">compute</span><span class="o">)</span>
<span class="c1">// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)</span>
<span class="c1">// and returns (newVertex: V, outMessages: Array[M])</span>
<span class="nc">Bagel</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">vertices</span><span class="o">,</span> <span class="n">messages</span><span class="o">,</span> <span class="n">numSplits</span><span class="o">)(</span><span class="n">compute</span><span class="o">)</span>
<span class="c1">// where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int)</span>
<span class="c1">// and returns (newVertex: V, outMessages: Array[M])</span>
</code></pre></div>
<h3 id="types">Types</h3>
<div class="highlight"><pre><code class="scala"><span class="k">trait</span> <span class="nc">Combiner</span><span class="o">[</span><span class="kt">M</span>, <span class="kt">C</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">createCombiner</span><span class="o">(</span><span class="n">msg</span><span class="k">:</span> <span class="kt">M</span><span class="o">)</span><span class="k">:</span> <span class="kt">C</span>
<span class="k">def</span> <span class="n">mergeMsg</span><span class="o">(</span><span class="n">combiner</span><span class="k">:</span> <span class="kt">C</span><span class="o">,</span> <span class="n">msg</span><span class="k">:</span> <span class="kt">M</span><span class="o">)</span><span class="k">:</span> <span class="kt">C</span>
<span class="k">def</span> <span class="n">mergeCombiners</span><span class="o">(</span><span class="n">a</span><span class="k">:</span> <span class="kt">C</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">C</span><span class="o">)</span><span class="k">:</span> <span class="kt">C</span>
<span class="o">}</span>
<span class="k">trait</span> <span class="nc">Aggregator</span><span class="o">[</span><span class="kt">V</span>, <span class="kt">A</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">createAggregator</span><span class="o">(</span><span class="n">vert</span><span class="k">:</span> <span class="kt">V</span><span class="o">)</span><span class="k">:</span> <span class="kt">A</span>
<span class="k">def</span> <span class="n">mergeAggregators</span><span class="o">(</span><span class="n">a</span><span class="k">:</span> <span class="kt">A</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">A</span><span class="o">)</span><span class="k">:</span> <span class="kt">A</span>
<span class="o">}</span>
<span class="k">trait</span> <span class="nc">Vertex</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">active</span><span class="k">:</span> <span class="kt">Boolean</span>
<span class="o">}</span>
<span class="k">trait</span> <span class="nc">Message</span><span class="o">[</span><span class="kt">K</span><span class="o">]</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">targetId</span><span class="k">:</span> <span class="kt">K</span>
<span class="o">}</span>
</code></pre></div>
<h1 id="where-to-go-from-here">Where to Go from Here</h1>
<p>Two example jobs, PageRank and shortest path, are included in <code>examples/src/main/scala/org/apache/spark/examples/bagel</code>. You can run them by passing the class name to the <code>bin/run-example</code> script included in Spark; e.g.:</p>
<pre><code>./bin/run-example org.apache.spark.examples.bagel.WikipediaPageRank
</code></pre>
<p>Each example program prints usage help when run without any arguments.</p>
</div> <!-- /container -->
<script src="js/vendor/jquery-1.8.0.min.js"></script>
<script src="js/vendor/bootstrap.min.js"></script>
<script src="js/main.js"></script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>