diff options
Diffstat (limited to 'docs/graphx-programming-guide.md')
-rw-r--r-- | docs/graphx-programming-guide.md | 56 |
1 files changed, 22 insertions, 34 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 002ba0cf73..c82c3d7358 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -478,24 +478,26 @@ def mapReduceTriplets[A]( The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which is applied to each triplet and can yield *messages* destined to either (none or both) vertices in -the triplet. We currently only support messages destined to the source or destination vertex of the -triplet to enable optimized preaggregation. The user defined `reduce` function combines the +the triplet. To facilitate optimized pre-aggregation, we currently only support messages destined +to the source or destination vertex of the triplet. The user defined `reduce` function combines the messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]` -containing the aggregate message to each vertex. Vertices that do not receive a message are not -included in the returned `VertexRDD`. +containing the aggregate message (of type `A`) destined to each vertex. Vertices that do not +receive a message are not included in the returned `VertexRDD`. -> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which +> Note that `mapReduceTriplets` takes an additional optional `activeSet` (see API docs) which > restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting > computation to triplets adjacent to a subset of the vertices is often necessary in incremental > iterative computation and is a key part of the GraphX implementation of Pregel. -We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For -example if we wanted to compute the average age of followers who are older that each user we could -do the following. +In the following example we use the `mapReduceTriplets` operator to compute the average age of the +more senior followers of each user. {% highlight scala %} -// Graph with age as the vertex property -val graph: Graph[Double, String] = getFromSomewhereElse() +// Import Random graph generation library +import org.apache.spark.graphx.util.GraphGenerators +// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity. +val graph: Graph[Double, Int] = + GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble ) // Compute the number of older followers and their total age val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( triplet => { // Map Function @@ -511,30 +513,16 @@ val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Dou (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function ) // Divide total age by number of older followers to get average age of older followers -val avgAgeOlderFollowers: VertexRDD[Double] = - olderFollowers.mapValues { case (count, totalAge) => totalAge / count } +val avgAgeOfOlderFollowers: VertexRDD[Double] = + olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ) +// Display the results +avgAgeOfOlderFollowers.collect.foreach(println(_)) {% endhighlight %} > Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums) > are constant sized (e.g., floats and addition instead of lists and concatenation). More -> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex. - -Because it is often necessary to aggregate information about neighboring vertices we also provide an -alternative interface defined in [`GraphOps`][GraphOps]: - -{% highlight scala %} -def aggregateNeighbors[A]( - map: (VertexID, EdgeTriplet[VD, ED]) => Option[A], - reduce: (A, A) => A, - edgeDir: EdgeDirection) - : VertexRDD[A] -{% endhighlight %} - -The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but allows -the user to define the logic in a more vertex centric manner. Here the `map` function is provided -the vertex to which the message is sent as well as one of the edges and returns the optional message -value. The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All` edges -adjacent to each vertex. +> precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each +> vertex. ### Computing Degree Information @@ -546,13 +534,13 @@ compute the max in, out, and total degrees: {% highlight scala %} // Define a reduce operation to compute the highest degree vertex -def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { +def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = { if (a._2 > b._2) a else b } // Compute the max degrees -val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce) -val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce) -val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce) +val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max) +val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max) +val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max) {% endhighlight %} |