From c787ff5640ad9d6f6dc3b744d73a1cb0c91eb90a Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Sun, 12 Jan 2014 20:49:41 -0800 Subject: Documenting Pregel API --- docs/graphx-programming-guide.md | 199 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 198 insertions(+), 1 deletion(-) (limited to 'docs/graphx-programming-guide.md') diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 22feccb7ad..89759416f4 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -429,12 +429,209 @@ val joinedGraph = graph.joinVertices(uniqueCosts, {% endhighlight %} -## Map Reduce Triplets (mapReduceTriplets) +## Neighborhood Aggregation + +A key part of graph computation is aggregating information about the neighborhood of each vertex. +For example we might want to know the number of followers each user has or the average age of the +the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path, and +connected components) repeatedly aggregate properties of neighboring vertices (e.g., current +PageRank Value, shortest path to the source, and smallest reachable vertex id). + +### Map Reduce Triplets (mapReduceTriplets) +[Graph.mapReduceTriplets]: api/graphx/index.html#mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=>Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=>A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A] + +These core (heavily optimized) aggregation primitive in GraphX is the +(`mapReduceTriplets`)[Graph.mapReduceTriplets] operator: + +{% highlight scala %} +def mapReduceTriplets[A]( + map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + reduce: (A, A) => A) + : VertexRDD[A] +{% endhighlight %} + +The (`mapReduceTriplets`)[Graph.mapReduceTriplets] operator takes a user defined map function which +is applied to each triplet and can yield *messages* destined to either (none or both) vertices in +the triplet. We currently only support messages destined to the source or destination vertex of the +triplet to enable optimized preaggregation. The user defined `reduce` function combines the +messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]` +containing the aggregate message to each vertex. Vertices that do not receive a message are not +included in the returned `VertexRDD`. + +> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which +> restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting +> computation to triplets adjacent to a subset of the vertices is often necessary in incremental +> iterative computation and is a key part of the GraphX implementation of Pregel. + +We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For +example if we wanted to compute the average age of followers who are older that each user we could +do the following. + +{% highlight scala %} +// Graph with age as the vertex property +val graph: Graph[Double, String] = getFromSomewhereElse() +// Compute the number of older followers and their total age +val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( + triplet => { // Map Function + if (triplet.srcAttr > triplet.dstAttr) { + // Send message to destination vertex containing counter and age + Iterator((triplet.dstId, (1, triplet.srcAttr))) + } else { + // Don't send a message for this triplet + Iterator.empty + } + }, + // Add counter and age + (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function +) +// Divide total age by number of older followers to get average age of older followers +val avgAgeOlderFollowers: VertexRDD[Double] = + olderFollowers.mapValues { case (count, totalAge) => totalAge / count } +{% endhighlight %} + +> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums) +> are constant sized (e.g., floats and addition instead of lists and concatenation). More +> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex. + +Because it is often necessary to aggregate information about neighboring vertices we also provide an +alternative interface defined in [`GraphOps`][GraphOps]: + +{% highlight scala %} +def aggregateNeighbors[A]( + map: (VertexID, EdgeTriplet[VD, ED]) => Option[A], + reduce: (A, A) => A, + edgeDir: EdgeDirection) + : VertexRDD[A] +{% endhighlight %} + +The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but allows +the user to define the logic in a more vertex centric manner. Here the `map` function is provided +the vertex to which the message is sent as well as one of the edges and returns the optional message +value. The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All` edges +adjacent to each vertex. + +### Computing Degree Information + +A common aggregation task is computing the degree of each vertex: the number of edges adjacent to +each vertex. In the context of directed graphs it often necessary to know the in-degree, out- +degree, and the total degree of each vertex. The [`GraphOps`][GraphOps] class contains a +collection of operators to compute the degrees of each vertex. For example in the following we +compute the max in, out, and total degrees: + +{% highlight scala %} +// Define a reduce operation to compute the highest degree vertex +def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { + if (a._2 > b._2) a else b +} +// Compute the max degrees +val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce) +val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce) +val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce) +{% endhighlight %} + + +### Collecting Neighbors + +In some cases it may be easier to express computation by collecting neighboring vertices and their +attributes at each vertex. This can be easily accomplished using the `collectNeighborIds` and the +`collectNeighbors` operators. + +{% highlight scala %} +def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = +def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ] +{% endhighlight %} + +> Note that these operators can be quite costly as they duplicate information and require +> substantial communication. If possible try expressing the same computation using the +> `mapReduceTriplets` operator directly. + # Pregel API +Graphs are inherently recursive data-structures as properties of a vertices depend on properties of +their neighbors which intern depend on properties of the neighbors of their neighbors. As a +consequence many important graph algorithms iteratively recompute the properties of each vertex +until a fixed-point condition is reached. A range of graph-parallel abstractions have been proposed +to express these iterative algorithms. GraphX exposes a Pregel operator which is a fusion of +the widely used Pregel and GraphLab abstractions. + +At a high-level the GraphX variant of the Pregel abstraction is a bulk-synchronous parallel +messaging abstraction constrained to the topology of the graph. The Pregel operator executes in a +series of super-steps in which vertices receive the sum of their inbound messages from the previous +super-step, compute a new property value, and then send messages to neighboring vertices in the next +super-step. Vertices that do not receive a message are skipped within a super-step. The Pregel +operators terminates iteration and returns the final graph when there are no messages remaining. + +> Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to +> neighboring vertices and the message construction is done in parallel using a user defined +> messaging function. These constraints allow additional optimization within GraphX. + +The following is type signature of the Pregel operator as well as a *sketch* of its implementation +(note calls to graph.cache have been removed): + +{% highlight scala %} +def pregel[A] + (initialMsg: A, + maxIter: Int = Int.MaxValue, + activeDir: EdgeDirection = EdgeDirection.Out) + (vprog: (VertexID, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { + // Receive the initial message at each vertex + var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() + // compute the messages + var messages = g.mapReduceTriplets(sendMsg, mergeMsg) + var activeMessages = messages.count() + // Loop until no messages remain or maxIterations is achieved + var i = 0 + while (activeMessages > 0 && i < maxIterations) { + // Receive the messages: ----------------------------------------------------------------------- + // Run the vertex program on all vertices that receive messages + val newVerts = g.vertices.innerJoin(messages)(vprog).cache() + // Merge the new vertex values back into the graph + g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() + // Send Messages: ------------------------------------------------------------------------------ + // Vertices that didn't receive a message above don't appear in newVerts and therefore don't + // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked + // on edges in the activeDir of vertices in newVerts + messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() + activeMessages = messages.count() + i += 1 + } + g +} +{% endhighlight %} + +Notice that Pregel takes two argument lists (i.e., `graph.pregel(list1)(list2)`). The first +argument list contains configuration parameters including the initial message, the maximum number of +iterations, and the edge direction in which to send messages (by default along out edges). The +second argument list contains the user defined functions for receiving messages (the vertex program +`vprog`), computing messages (`sendMsg`), and combining messages `mergeMsg`. + +We can use the Pregel operator to express computation such single source shortest path in the +following example. + +{% highlight scala %} +val graph: Graph[String, Double] // A graph with edge attributes containing distances +val sourceId: VertexId = 42 // The ultimate source +// Initialize the graph such that all vertices except the root have distance infinity. +val initialGraph = graph.mapVertices((id, _) => if (id == shourceId) 0.0 else Double.PositiveInfinity) +val sssp = initialGraph.pregel(Double.PositiveInfinity)( + (id, dist, newDist) => math.min(dist, newDist) // Vertex Program + triplet => { // Send Message + if(triplet.srcAttr + triplet.attr < triplet.dstAttr) { + Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) + } else { + Iterator.empty + } + }, + (a,b) => math.min(a,b) // Merge Message + ) +{% endhighlight %} + # Graph Builders -- cgit v1.2.3