aboutsummaryrefslogtreecommitdiff
path: root/docs/graphx-programming-guide.md
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-01-13 17:15:21 -0800
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-01-13 17:18:31 -0800
commitcfe4a29dcb516ceae5f243ac3b5d0c3a505d7f5a (patch)
tree2ec2c1b4a67548048775c6b98df43e55dda92f32 /docs/graphx-programming-guide.md
parentae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded (diff)
downloadspark-cfe4a29dcb516ceae5f243ac3b5d0c3a505d7f5a.tar.gz
spark-cfe4a29dcb516ceae5f243ac3b5d0c3a505d7f5a.tar.bz2
spark-cfe4a29dcb516ceae5f243ac3b5d0c3a505d7f5a.zip
Improvements in example code for the programming guide as well as adding serialization support for GraphImpl to address issues with failed closure capture.
Diffstat (limited to 'docs/graphx-programming-guide.md')
-rw-r--r--docs/graphx-programming-guide.md39
1 files changed, 22 insertions, 17 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index e6afd092be..c82c3d7358 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -478,24 +478,26 @@ def mapReduceTriplets[A](
The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which
is applied to each triplet and can yield *messages* destined to either (none or both) vertices in
-the triplet. We currently only support messages destined to the source or destination vertex of the
-triplet to enable optimized preaggregation. The user defined `reduce` function combines the
+the triplet. To facilitate optimized pre-aggregation, we currently only support messages destined
+to the source or destination vertex of the triplet. The user defined `reduce` function combines the
messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]`
-containing the aggregate message to each vertex. Vertices that do not receive a message are not
-included in the returned `VertexRDD`.
+containing the aggregate message (of type `A`) destined to each vertex. Vertices that do not
+receive a message are not included in the returned `VertexRDD`.
-> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which
+> Note that `mapReduceTriplets` takes an additional optional `activeSet` (see API docs) which
> restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting
> computation to triplets adjacent to a subset of the vertices is often necessary in incremental
> iterative computation and is a key part of the GraphX implementation of Pregel.
-We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For
-example if we wanted to compute the average age of followers who are older that each user we could
-do the following.
+In the following example we use the `mapReduceTriplets` operator to compute the average age of the
+more senior followers of each user.
{% highlight scala %}
-// Graph with age as the vertex property
-val graph: Graph[Double, String] = getFromSomewhereElse()
+// Import Random graph generation library
+import org.apache.spark.graphx.util.GraphGenerators
+// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
+val graph: Graph[Double, Int] =
+ GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
triplet => { // Map Function
@@ -511,13 +513,16 @@ val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Dou
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
-val avgAgeOlderFollowers: VertexRDD[Double] =
- olderFollowers.mapValues { case (count, totalAge) => totalAge / count }
+val avgAgeOfOlderFollowers: VertexRDD[Double] =
+ olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
+// Display the results
+avgAgeOfOlderFollowers.collect.foreach(println(_))
{% endhighlight %}
> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums)
> are constant sized (e.g., floats and addition instead of lists and concatenation). More
-> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex.
+> precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each
+> vertex.
### Computing Degree Information
@@ -529,13 +534,13 @@ compute the max in, out, and total degrees:
{% highlight scala %}
// Define a reduce operation to compute the highest degree vertex
-def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
+def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
-val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce)
-val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce)
-val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce)
+val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max)
+val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max)
+val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max)
{% endhighlight %}