summaryrefslogblamecommitdiff
path: root/site/docs/1.0.0/bagel-programming-guide.html
blob: 171c813565ce7ea3f07b1c1b061ef546a118c248 (plain) (tree)




























                                                                              












                                                                                                                             









































































































































































































































































                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
<!DOCTYPE html>
<!--[if lt IE 7]>      <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]>         <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]>         <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
    <head>
        <meta charset="utf-8">
        <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
        <title>Bagel Programming Guide - Spark 1.0.0 Documentation</title>
        <meta name="description" content="">

        

        <link rel="stylesheet" href="css/bootstrap.min.css">
        <style>
            body {
                padding-top: 60px;
                padding-bottom: 40px;
            }
        </style>
        <meta name="viewport" content="width=device-width">
        <link rel="stylesheet" href="css/bootstrap-responsive.min.css">
        <link rel="stylesheet" href="css/main.css">

        <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>

        <link rel="stylesheet" href="css/pygments-default.css">

        
        <!-- Google analytics script -->
        <script type="text/javascript">
          var _gaq = _gaq || [];
          _gaq.push(['_setAccount', 'UA-32518208-1']);
          _gaq.push(['_trackPageview']);

          (function() {
            var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
            ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
            var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
          })();
        </script>
        

    </head>
    <body>
        <!--[if lt IE 7]>
            <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
        <![endif]-->

        <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->

        <div class="navbar navbar-fixed-top" id="topbar">
            <div class="navbar-inner">
                <div class="container">
                    <div class="brand"><a href="index.html">
                      <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.0.0</span>
                    </div>
                    <ul class="nav">
                        <!--TODO(andyk): Add class="active" attribute to li some how.-->
                        <li><a href="index.html">Overview</a></li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="quick-start.html">Quick Start</a></li>
                                <li><a href="programming-guide.html">Spark Programming Guide</a></li>
                                <li class="divider"></li>
                                <li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
                                <li><a href="sql-programming-guide.html">Spark SQL</a></li>
                                <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
                                <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
                                <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="api/scala/index.html#org.apache.spark.package">Scaladoc</a></li>
                                <li><a href="api/java/index.html">Javadoc</a></li>
                                <li><a href="api/python/index.html">Python API</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="cluster-overview.html">Overview</a></li>
                                <li><a href="submitting-applications.html">Submitting Applications</a></li>
                                <li class="divider"></li>
                                <li><a href="ec2-scripts.html">Amazon EC2</a></li>
                                <li><a href="spark-standalone.html">Standalone Mode</a></li>
                                <li><a href="running-on-mesos.html">Mesos</a></li>
                                <li><a href="running-on-yarn.html">YARN</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="configuration.html">Configuration</a></li>
                                <li><a href="monitoring.html">Monitoring</a></li>
                                <li><a href="tuning.html">Tuning Guide</a></li>
                                <li><a href="job-scheduling.html">Job Scheduling</a></li>
                                <li><a href="security.html">Security</a></li>
                                <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
                                <li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li>
                                <li class="divider"></li>
                                <li><a href="building-with-maven.html">Building Spark with Maven</a></li>
                                <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
                            </ul>
                        </li>
                    </ul>
                    <!--<p class="navbar-text pull-right"><span class="version-text">v1.0.0</span></p>-->
                </div>
            </div>
        </div>

        <div class="container" id="content">
          
            <h1 class="title">Bagel Programming Guide</h1>
          

          <p><strong>Bagel will soon be superseded by <a href="graphx-programming-guide.html">GraphX</a>; we recommend that new users try GraphX instead.</strong></p>

<p>Bagel is a Spark implementation of Google&#8217;s <a href="http://portal.acm.org/citation.cfm?id=1807184">Pregel</a> graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators.</p>

<p>In the Pregel programming model, jobs run as a sequence of iterations called <em>supersteps</em>. In each superstep, each vertex in the graph runs a user-specified function that can update state associated with the vertex and send messages to other vertices for use in the <em>next</em> iteration.</p>

<p>This guide shows the programming model and features of Bagel by walking through an example implementation of PageRank on Bagel.</p>

<h1 id="linking-with-bagel">Linking with Bagel</h1>

<p>To use Bagel in your program, add the following SBT or Maven dependency:</p>

<pre><code>groupId = org.apache.spark
artifactId = spark-bagel_2.10
version = 1.0.0
</code></pre>

<h1 id="programming-model">Programming Model</h1>

<p>Bagel operates on a graph represented as a <a href="programming-guide.html">distributed dataset</a> of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.</p>

<p>For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.</p>

<p>We first extend the default <code>Vertex</code> class to store a <code>Double</code>
representing the current PageRank of the vertex, and similarly extend
the <code>Message</code> and <code>Edge</code> classes. Note that these need to be marked <code>@serializable</code> to allow Spark to transfer them across machines. We also import the Bagel types and implicit conversions.</p>

<div class="highlight"><pre><code class="scala"><span class="k">import</span> <span class="nn">org.apache.spark.bagel._</span>
<span class="k">import</span> <span class="nn">org.apache.spark.bagel.Bagel._</span>

<span class="nd">@serializable</span> <span class="k">class</span> <span class="nc">PREdge</span><span class="o">(</span><span class="k">val</span> <span class="n">targetId</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Edge</span>

<span class="nd">@serializable</span> <span class="k">class</span> <span class="nc">PRVertex</span><span class="o">(</span>
  <span class="k">val</span> <span class="n">id</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">val</span> <span class="n">rank</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="k">val</span> <span class="n">outEdges</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Edge</span><span class="o">],</span>
  <span class="k">val</span> <span class="n">active</span><span class="k">:</span> <span class="kt">Boolean</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Vertex</span>

<span class="nd">@serializable</span> <span class="k">class</span> <span class="nc">PRMessage</span><span class="o">(</span>
  <span class="k">val</span> <span class="n">targetId</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">val</span> <span class="n">rankShare</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Message</span>
</code></pre></div>

<p>Next, we load a sample graph from a text file as a distributed dataset and package it into <code>PRVertex</code> objects. We also cache the distributed dataset because Bagel will use it multiple times and we&#8217;d like to avoid recomputing it.</p>

<div class="highlight"><pre><code class="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">&quot;data/pagerank_data.txt&quot;</span><span class="o">)</span>

<span class="k">val</span> <span class="n">numVerts</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">count</span><span class="o">()</span>

<span class="k">val</span> <span class="n">verts</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">line</span> <span class="k">=&gt;</span> <span class="o">{</span>
  <span class="k">val</span> <span class="n">fields</span> <span class="k">=</span> <span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="sc">&#39;\t&#39;</span><span class="o">)</span>
  <span class="k">val</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">linksStr</span><span class="o">)</span> <span class="k">=</span> <span class="o">(</span><span class="n">fields</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">fields</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span>
    <span class="k">val</span> <span class="n">links</span> <span class="k">=</span> <span class="n">linksStr</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="sc">&#39;,&#39;</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="k">new</span> <span class="nc">PREdge</span><span class="o">(</span><span class="k">_</span><span class="o">))</span>
  <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="k">new</span> <span class="nc">PRVertex</span><span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="mf">1.0</span> <span class="o">/</span> <span class="n">numVerts</span><span class="o">,</span> <span class="n">links</span><span class="o">,</span> <span class="kc">true</span><span class="o">))</span>
<span class="o">}).</span><span class="n">cache</span>
</code></pre></div>

<p>We run the Bagel job, passing in <code>verts</code>, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations.</p>

<div class="highlight"><pre><code class="scala"><span class="k">val</span> <span class="n">emptyMsgs</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span><span class="nc">List</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">PRMessage</span><span class="o">)]())</span>

<span class="k">def</span> <span class="n">compute</span><span class="o">(</span><span class="n">self</span><span class="k">:</span> <span class="kt">PRVertex</span><span class="o">,</span> <span class="n">msgs</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Seq</span><span class="o">[</span><span class="kt">PRMessage</span><span class="o">]],</span> <span class="n">superstep</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
<span class="k">:</span> <span class="o">(</span><span class="kt">PRVertex</span><span class="o">,</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">PRMessage</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
  <span class="k">val</span> <span class="n">msgSum</span> <span class="k">=</span> <span class="n">msgs</span><span class="o">.</span><span class="n">getOrElse</span><span class="o">(</span><span class="nc">List</span><span class="o">()).</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">rankShare</span><span class="o">).</span><span class="n">sum</span>
    <span class="k">val</span> <span class="n">newRank</span> <span class="k">=</span>
      <span class="k">if</span> <span class="o">(</span><span class="n">msgSum</span> <span class="o">!=</span> <span class="mi">0</span><span class="o">)</span>
        <span class="mf">0.15</span> <span class="o">/</span> <span class="n">numVerts</span> <span class="o">+</span> <span class="mf">0.85</span> <span class="o">*</span> <span class="n">msgSum</span>
      <span class="k">else</span>
        <span class="n">self</span><span class="o">.</span><span class="n">rank</span>
    <span class="k">val</span> <span class="n">halt</span> <span class="k">=</span> <span class="n">superstep</span> <span class="o">&gt;=</span> <span class="mi">10</span>
    <span class="k">val</span> <span class="n">msgsOut</span> <span class="k">=</span>
      <span class="k">if</span> <span class="o">(!</span><span class="n">halt</span><span class="o">)</span>
        <span class="n">self</span><span class="o">.</span><span class="n">outEdges</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">edge</span> <span class="k">=&gt;</span>
          <span class="k">new</span> <span class="nc">PRMessage</span><span class="o">(</span><span class="n">edge</span><span class="o">.</span><span class="n">targetId</span><span class="o">,</span> <span class="n">newRank</span> <span class="o">/</span> <span class="n">self</span><span class="o">.</span><span class="n">outEdges</span><span class="o">.</span><span class="n">size</span><span class="o">))</span>
      <span class="k">else</span>
        <span class="nc">List</span><span class="o">()</span>
    <span class="o">(</span><span class="k">new</span> <span class="nc">PRVertex</span><span class="o">(</span><span class="n">self</span><span class="o">.</span><span class="n">id</span><span class="o">,</span> <span class="n">newRank</span><span class="o">,</span> <span class="n">self</span><span class="o">.</span><span class="n">outEdges</span><span class="o">,</span> <span class="o">!</span><span class="n">halt</span><span class="o">),</span> <span class="n">msgsOut</span><span class="o">)</span>
<span class="o">}</span>
</code></pre></div>

<p>val result = Bagel.run(sc, verts, emptyMsgs)()(compute)</p>

<p>Finally, we print the results.</p>

<div class="highlight"><pre><code class="scala"><span class="n">println</span><span class="o">(</span><span class="n">result</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">v</span> <span class="k">=&gt;</span> <span class="s">&quot;%s\t%s\n&quot;</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="n">v</span><span class="o">.</span><span class="n">id</span><span class="o">,</span> <span class="n">v</span><span class="o">.</span><span class="n">rank</span><span class="o">)).</span><span class="n">collect</span><span class="o">.</span><span class="n">mkString</span><span class="o">)</span>
</code></pre></div>

<h2 id="combiners">Combiners</h2>

<p>Sending a message to another vertex generally involves expensive communication over the network. For certain algorithms, it&#8217;s possible to reduce the amount of communication using <em>combiners</em>. For example, if the compute function receives integer messages and only uses their sum, it&#8217;s possible for Bagel to combine multiple messages to the same vertex by summing them.</p>

<p>For combiner support, Bagel can optionally take a set of combiner functions that convert messages to their combined form.</p>

<p><em>Example: PageRank with combiners</em></p>

<h2 id="aggregators">Aggregators</h2>

<p>Aggregators perform a reduce across all vertices after each superstep, and provide the result to each vertex in the next superstep.</p>

<p>For aggregator support, Bagel can optionally take an aggregator function that reduces across each vertex.</p>

<p><em>Example</em></p>

<h2 id="operations">Operations</h2>

<p>Here are the actions and types in the Bagel API. See <a href="https://github.com/apache/spark/blob/master/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala">Bagel.scala</a> for details.</p>

<h3 id="actions">Actions</h3>

<div class="highlight"><pre><code class="scala"><span class="cm">/*** Full form ***/</span>

<span class="nc">Bagel</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">vertices</span><span class="o">,</span> <span class="n">messages</span><span class="o">,</span> <span class="n">combiner</span><span class="o">,</span> <span class="n">aggregator</span><span class="o">,</span> <span class="n">partitioner</span><span class="o">,</span> <span class="n">numSplits</span><span class="o">)(</span><span class="n">compute</span><span class="o">)</span>
<span class="c1">// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)</span>
<span class="c1">// and returns (newVertex: V, outMessages: Array[M])</span>

<span class="cm">/*** Abbreviated forms ***/</span>

<span class="nc">Bagel</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">vertices</span><span class="o">,</span> <span class="n">messages</span><span class="o">,</span> <span class="n">combiner</span><span class="o">,</span> <span class="n">partitioner</span><span class="o">,</span> <span class="n">numSplits</span><span class="o">)(</span><span class="n">compute</span><span class="o">)</span>
<span class="c1">// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)</span>
<span class="c1">// and returns (newVertex: V, outMessages: Array[M])</span>

<span class="nc">Bagel</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">vertices</span><span class="o">,</span> <span class="n">messages</span><span class="o">,</span> <span class="n">combiner</span><span class="o">,</span> <span class="n">numSplits</span><span class="o">)(</span><span class="n">compute</span><span class="o">)</span>
<span class="c1">// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)</span>
<span class="c1">// and returns (newVertex: V, outMessages: Array[M])</span>

<span class="nc">Bagel</span><span class="o">.</span><span class="n">run</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">vertices</span><span class="o">,</span> <span class="n">messages</span><span class="o">,</span> <span class="n">numSplits</span><span class="o">)(</span><span class="n">compute</span><span class="o">)</span>
<span class="c1">// where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int)</span>
<span class="c1">// and returns (newVertex: V, outMessages: Array[M])</span>
</code></pre></div>

<h3 id="types">Types</h3>

<div class="highlight"><pre><code class="scala"><span class="k">trait</span> <span class="nc">Combiner</span><span class="o">[</span><span class="kt">M</span>, <span class="kt">C</span><span class="o">]</span> <span class="o">{</span>
  <span class="k">def</span> <span class="n">createCombiner</span><span class="o">(</span><span class="n">msg</span><span class="k">:</span> <span class="kt">M</span><span class="o">)</span><span class="k">:</span> <span class="kt">C</span>
  <span class="k">def</span> <span class="n">mergeMsg</span><span class="o">(</span><span class="n">combiner</span><span class="k">:</span> <span class="kt">C</span><span class="o">,</span> <span class="n">msg</span><span class="k">:</span> <span class="kt">M</span><span class="o">)</span><span class="k">:</span> <span class="kt">C</span>
  <span class="k">def</span> <span class="n">mergeCombiners</span><span class="o">(</span><span class="n">a</span><span class="k">:</span> <span class="kt">C</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">C</span><span class="o">)</span><span class="k">:</span> <span class="kt">C</span>
<span class="o">}</span>

<span class="k">trait</span> <span class="nc">Aggregator</span><span class="o">[</span><span class="kt">V</span>, <span class="kt">A</span><span class="o">]</span> <span class="o">{</span>
  <span class="k">def</span> <span class="n">createAggregator</span><span class="o">(</span><span class="n">vert</span><span class="k">:</span> <span class="kt">V</span><span class="o">)</span><span class="k">:</span> <span class="kt">A</span>
  <span class="k">def</span> <span class="n">mergeAggregators</span><span class="o">(</span><span class="n">a</span><span class="k">:</span> <span class="kt">A</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">A</span><span class="o">)</span><span class="k">:</span> <span class="kt">A</span>
<span class="o">}</span>

<span class="k">trait</span> <span class="nc">Vertex</span> <span class="o">{</span>
  <span class="k">def</span> <span class="n">active</span><span class="k">:</span> <span class="kt">Boolean</span>
<span class="o">}</span>

<span class="k">trait</span> <span class="nc">Message</span><span class="o">[</span><span class="kt">K</span><span class="o">]</span> <span class="o">{</span>
  <span class="k">def</span> <span class="n">targetId</span><span class="k">:</span> <span class="kt">K</span>
<span class="o">}</span>
</code></pre></div>

<h1 id="where-to-go-from-here">Where to Go from Here</h1>

<p>Two example jobs, PageRank and shortest path, are included in <code>examples/src/main/scala/org/apache/spark/examples/bagel</code>. You can run them by passing the class name to the <code>bin/run-example</code> script included in Spark; e.g.:</p>

<pre><code>./bin/run-example org.apache.spark.examples.bagel.WikipediaPageRank
</code></pre>

<p>Each example program prints usage help when run without any arguments.</p>


        </div> <!-- /container -->

        <script src="js/vendor/jquery-1.8.0.min.js"></script>
        <script src="js/vendor/bootstrap.min.js"></script>
        <script src="js/main.js"></script>

        <!-- MathJax Section -->
        <script type="text/x-mathjax-config">
              MathJax.Hub.Config({
                TeX: { equationNumbers: { autoNumber: "AMS" } }
              });
            </script>
        <script type="text/javascript"
         src="http://cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML"></script>
        <script>
          MathJax.Hub.Config({
            tex2jax: {
              inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
              displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], 
              processEscapes: true,
              skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
            }
          });
        </script>
    </body>
</html>