+## Neighborhood Aggregation
+A key part of graph computation is aggregating information about the neighborhood of each vertex.
+For example we might want to know the number of followers each user has or the average age of the
+the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path, and
+connected components) repeatedly aggregate properties of neighboring vertices (e.g., current
+PageRank Value, shortest path to the source, and smallest reachable vertex id).
+### Map Reduce Triplets (mapReduceTriplets)
<a name="mrTriplets"></a>
+[Graph.mapReduceTriplets]: api/graphx/index.html#mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
+These core (heavily optimized) aggregation primitive in GraphX is the
+(`mapReduceTriplets`)[Graph.mapReduceTriplets] operator:
+{% highlight scala %}
+def mapReduceTriplets[A](
+ map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ reduce: (A, A) => A)
+ : VertexRDD[A]
+{% endhighlight %}
+The (`mapReduceTriplets`)[Graph.mapReduceTriplets] operator takes a user defined map function which
+is applied to each triplet and can yield *messages* destined to either (none or both) vertices in
+the triplet. We currently only support messages destined to the source or destination vertex of the
+triplet to enable optimized preaggregation. The user defined `reduce` function combines the
+messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]`
+containing the aggregate message to each vertex. Vertices that do not receive a message are not
+included in the returned `VertexRDD`.
+> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which
+> restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting
+> computation to triplets adjacent to a subset of the vertices is often necessary in incremental
+> iterative computation and is a key part of the GraphX implementation of Pregel.
+We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For
+example if we wanted to compute the average age of followers who are older that each user we could
+do the following.
+{% highlight scala %}
+// Graph with age as the vertex property
+val graph: Graph[Double, String] = getFromSomewhereElse()
+// Compute the number of older followers and their total age
+val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
+ triplet => { // Map Function
+ if (triplet.srcAttr > triplet.dstAttr) {
+ // Send message to destination vertex containing counter and age
+ Iterator((triplet.dstId, (1, triplet.srcAttr)))
+ } else {
+ // Don't send a message for this triplet
+ Iterator.empty
+ }
+ },
+ // Add counter and age
+ (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
+// Divide total age by number of older followers to get average age of older followers
+val avgAgeOlderFollowers: VertexRDD[Double] =
+ olderFollowers.mapValues { case (count, totalAge) => totalAge / count }
+{% endhighlight %}
+> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums)
+> are constant sized (e.g., floats and addition instead of lists and concatenation). More
+> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex.
+Because it is often necessary to aggregate information about neighboring vertices we also provide an
+alternative interface defined in [`GraphOps`][GraphOps]:
+{% highlight scala %}
+def aggregateNeighbors[A](
+ map: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
+ reduce: (A, A) => A,
+ edgeDir: EdgeDirection)
+ : VertexRDD[A]
+{% endhighlight %}
+The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but allows
+the user to define the logic in a more vertex centric manner. Here the `map` function is provided
+the vertex to which the message is sent as well as one of the edges and returns the optional message
+value. The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All` edges
+adjacent to each vertex.
+### Computing Degree Information
+A common aggregation task is computing the degree of each vertex: the number of edges adjacent to
+each vertex. In the context of directed graphs it often necessary to know the in-degree, out-
+degree, and the total degree of each vertex. The [`GraphOps`][GraphOps] class contains a
+collection of operators to compute the degrees of each vertex. For example in the following we
+compute the max in, out, and total degrees:
+{% highlight scala %}
+// Define a reduce operation to compute the highest degree vertex
+def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
+ if (a._2 > b._2) a else b
+// Compute the max degrees
+val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce)
+val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce)
+val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce)
+{% endhighlight %}
+### Collecting Neighbors
+In some cases it may be easier to express computation by collecting neighboring vertices and their
+attributes at each vertex. This can be easily accomplished using the `collectNeighborIds` and the
+`collectNeighbors` operators.
+{% highlight scala %}
+def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] =
+def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+{% endhighlight %}
+> Note that these operators can be quite costly as they duplicate information and require
+> substantial communication. If possible try expressing the same computation using the
+> `mapReduceTriplets` operator directly.
# Pregel API
<a name="pregel"></a>
+Graphs are inherently recursive data-structures as properties of a vertices depend on properties of
+their neighbors which intern depend on properties of the neighbors of their neighbors. As a
+consequence many important graph algorithms iteratively recompute the properties of each vertex
+until a fixed-point condition is reached. A range of graph-parallel abstractions have been proposed
+to express these iterative algorithms. GraphX exposes a Pregel operator which is a fusion of
+the widely used Pregel and GraphLab abstractions.
+At a high-level the GraphX variant of the Pregel abstraction is a bulk-synchronous parallel
+messaging abstraction constrained to the topology of the graph. The Pregel operator executes in a
+series of super-steps in which vertices receive the sum of their inbound messages from the previous
+super-step, compute a new property value, and then send messages to neighboring vertices in the next
+super-step. Vertices that do not receive a message are skipped within a super-step. The Pregel
+operators terminates iteration and returns the final graph when there are no messages remaining.
+> Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to
+> neighboring vertices and the message construction is done in parallel using a user defined
+> messaging function. These constraints allow additional optimization within GraphX.
+The following is type signature of the Pregel operator as well as a *sketch* of its implementation
+(note calls to graph.cache have been removed):
+{% highlight scala %}
+def pregel[A]
+ (initialMsg: A,
+ maxIter: Int = Int.MaxValue,
+ activeDir: EdgeDirection = EdgeDirection.Out)
+ (vprog: (VertexID, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+ mergeMsg: (A, A) => A)
+ : Graph[VD, ED] = {
+ // Receive the initial message at each vertex
+ var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
+ // compute the messages
+ var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
+ var activeMessages = messages.count()
+ // Loop until no messages remain or maxIterations is achieved
+ var i = 0
+ while (activeMessages > 0 && i < maxIterations) {
+ // Receive the messages: -----------------------------------------------------------------------
+ // Run the vertex program on all vertices that receive messages
+ val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+ // Merge the new vertex values back into the graph
+ g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
+ // Send Messages: ------------------------------------------------------------------------------
+ // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
+ // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
+ // on edges in the activeDir of vertices in newVerts
+ messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
+ activeMessages = messages.count()
+ i += 1
+ }
+ g
+{% endhighlight %}
+Notice that Pregel takes two argument lists (i.e., `graph.pregel(list1)(list2)`). The first
+argument list contains configuration parameters including the initial message, the maximum number of
+iterations, and the edge direction in which to send messages (by default along out edges). The
+second argument list contains the user defined functions for receiving messages (the vertex program
+`vprog`), computing messages (`sendMsg`), and combining messages `mergeMsg`.
+We can use the Pregel operator to express computation such single source shortest path in the
+following example.
+{% highlight scala %}
+val graph: Graph[String, Double] // A graph with edge attributes containing distances
+val sourceId: VertexId = 42 // The ultimate source
+// Initialize the graph such that all vertices except the root have distance infinity.
+val initialGraph = graph.mapVertices((id, _) => if (id == shourceId) 0.0 else Double.PositiveInfinity)
+val sssp = initialGraph.pregel(Double.PositiveInfinity)(
+ (id, dist, newDist) => math.min(dist, newDist) // Vertex Program
+ triplet => { // Send Message
+ if(triplet.srcAttr + triplet.attr < triplet.dstAttr) {
+ Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
+ } else {
+ Iterator.empty
+ }
+ },
+ (a,b) => math.min(a,b) // Merge Message
+ )
+{% endhighlight %}
# Graph Builders
<a name="graph_builders"></a>