aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/graphx-programming-guide.md39
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala3
2 files changed, 25 insertions, 17 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index e6afd092be..c82c3d7358 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -478,24 +478,26 @@ def mapReduceTriplets[A](
The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which
is applied to each triplet and can yield *messages* destined to either (none or both) vertices in
-the triplet. We currently only support messages destined to the source or destination vertex of the
-triplet to enable optimized preaggregation. The user defined `reduce` function combines the
+the triplet. To facilitate optimized pre-aggregation, we currently only support messages destined
+to the source or destination vertex of the triplet. The user defined `reduce` function combines the
messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]`
-containing the aggregate message to each vertex. Vertices that do not receive a message are not
-included in the returned `VertexRDD`.
+containing the aggregate message (of type `A`) destined to each vertex. Vertices that do not
+receive a message are not included in the returned `VertexRDD`.
-> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which
+> Note that `mapReduceTriplets` takes an additional optional `activeSet` (see API docs) which
> restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting
> computation to triplets adjacent to a subset of the vertices is often necessary in incremental
> iterative computation and is a key part of the GraphX implementation of Pregel.
-We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For
-example if we wanted to compute the average age of followers who are older that each user we could
-do the following.
+In the following example we use the `mapReduceTriplets` operator to compute the average age of the
+more senior followers of each user.
{% highlight scala %}
-// Graph with age as the vertex property
-val graph: Graph[Double, String] = getFromSomewhereElse()
+// Import Random graph generation library
+import org.apache.spark.graphx.util.GraphGenerators
+// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
+val graph: Graph[Double, Int] =
+ GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
triplet => { // Map Function
@@ -511,13 +513,16 @@ val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Dou
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
-val avgAgeOlderFollowers: VertexRDD[Double] =
- olderFollowers.mapValues { case (count, totalAge) => totalAge / count }
+val avgAgeOfOlderFollowers: VertexRDD[Double] =
+ olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
+// Display the results
+avgAgeOfOlderFollowers.collect.foreach(println(_))
{% endhighlight %}
> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums)
> are constant sized (e.g., floats and addition instead of lists and concatenation). More
-> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex.
+> precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each
+> vertex.
### Computing Degree Information
@@ -529,13 +534,13 @@ compute the max in, out, and total degrees:
{% highlight scala %}
// Define a reduce operation to compute the highest degree vertex
-def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
+def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
-val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce)
-val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce)
-val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce)
+val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max)
+val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max)
+val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max)
{% endhighlight %}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index c21f8935d9..916eb9763c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -32,6 +32,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient val replicatedVertexView: ReplicatedVertexView[VD])
extends Graph[VD, ED] with Serializable {
+ /** Default construct is provided to support serialization */
+ protected def this() = this(null, null, null, null)
+
/** Return a RDD that brings edges together with their source and destination vertices. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
val vdTag = classTag[VD]