aboutsummaryrefslogtreecommitdiff
path: root/docs/graphx-programming-guide.md
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-11-19 16:53:33 -0800
committerReynold Xin <rxin@databricks.com>2014-11-19 16:53:33 -0800
commit377b06820934cab6d67f3a9182528c7f417a7d98 (patch)
tree435ad8ddc8691b30f5653650dacfdc1a97658526 /docs/graphx-programming-guide.md
parent04d462f648aba7b18fc293b7189b86af70e421bc (diff)
downloadspark-377b06820934cab6d67f3a9182528c7f417a7d98.tar.gz
spark-377b06820934cab6d67f3a9182528c7f417a7d98.tar.bz2
spark-377b06820934cab6d67f3a9182528c7f417a7d98.zip
Updating GraphX programming guide and documentation
This pull request revises the programming guide to reflect changes in the GraphX API as well as the deprecated mapReduceTriplets operator. Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> Closes #3359 from jegonzal/GraphXProgrammingGuide and squashes the following commits: 4421964 [Joseph E. Gonzalez] updating documentation for graphx
Diffstat (limited to 'docs/graphx-programming-guide.md')
-rw-r--r--docs/graphx-programming-guide.md360
1 files changed, 216 insertions, 144 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index fdb9f98e21..28bb981751 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -6,6 +6,47 @@ title: GraphX Programming Guide
* This will become a table of contents (this text will be scraped).
{:toc}
+<!-- All the documentation links -->
+
+[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD
+[Edge]: api/scala/index.html#org.apache.spark.graphx.Edge
+[EdgeTriplet]: api/scala/index.html#org.apache.spark.graphx.EdgeTriplet
+[Graph]: api/scala/index.html#org.apache.spark.graphx.Graph
+[GraphOps]: api/scala/index.html#org.apache.spark.graphx.GraphOps
+[Graph.mapVertices]: api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
+[Graph.reverse]: api/scala/index.html#org.apache.spark.graphx.Graph@reverse:Graph[VD,ED]
+[Graph.subgraph]: api/scala/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]
+[Graph.mask]: api/scala/index.html#org.apache.spark.graphx.Graph@mask[VD2,ED2](Graph[VD2,ED2])(ClassTag[VD2],ClassTag[ED2]):Graph[VD,ED]
+[Graph.groupEdges]: api/scala/index.html#org.apache.spark.graphx.Graph@groupEdges((ED,ED)⇒ED):Graph[VD,ED]
+[GraphOps.joinVertices]: api/scala/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
+[Graph.outerJoinVertices]: api/scala/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
+[Graph.aggregateMessages]: api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A]
+[EdgeContext]: api/scala/index.html#org.apache.spark.graphx.EdgeContext
+[Graph.mapReduceTriplets]: api/scala/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexId,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
+[GraphOps.collectNeighborIds]: api/scala/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]
+[GraphOps.collectNeighbors]: api/scala/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]
+[RDD Persistence]: programming-guide.html#rdd-persistence
+[Graph.cache]: api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]
+[GraphOps.pregel]: api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
+[PartitionStrategy]: api/scala/index.html#org.apache.spark.graphx.PartitionStrategy$
+[GraphLoader.edgeListFile]: api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]
+[Graph.apply]: api/scala/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
+[Graph.fromEdgeTuples]: api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
+[Graph.fromEdges]: api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
+[PartitionStrategy]: api/scala/index.html#org.apache.spark.graphx.PartitionStrategy
+[Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED]
+[PageRank]: api/scala/index.html#org.apache.spark.graphx.lib.PageRank$
+[ConnectedComponents]: api/scala/index.html#org.apache.spark.graphx.lib.ConnectedComponents$
+[TriangleCount]: api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$
+[Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED]
+[EdgeContext.sendToSrc]: api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToSrc(msg:A):Unit
+[EdgeContext.sendToDst]: api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToDst(msg:A):Unit
+[TripletFields]: api/java/org/apache/spark/graphx/TripletFields.html
+[TripletFields.All]: api/java/org/apache/spark/graphx/TripletFields.html#All
+[TripletFields.None]: api/java/org/apache/spark/graphx/TripletFields.html#None
+[TripletFields.Src]: api/java/org/apache/spark/graphx/TripletFields.html#Src
+[TripletFields.Dst]: api/java/org/apache/spark/graphx/TripletFields.html#Dst
+
<p style="text-align: center;">
<img src="img/graphx_logo.png"
title="GraphX Logo"
@@ -16,18 +57,17 @@ title: GraphX Programming Guide
# Overview
-GraphX is the new (alpha) Spark API for graphs and graph-parallel computation. At a high-level,
+GraphX is the new (alpha) Spark API for graphs and graph-parallel computation. At a high level,
GraphX extends the Spark [RDD](api/scala/index.html#org.apache.spark.rdd.RDD) by introducing the
[Resilient Distributed Property Graph](#property_graph): a directed multigraph with properties
attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental
operators (e.g., [subgraph](#structural_operators), [joinVertices](#join_operators), and
-[mapReduceTriplets](#mrTriplets)) as well as an optimized variant of the [Pregel](#pregel) API. In
+[aggregateMessages](#aggregateMessages)) as well as an optimized variant of the [Pregel](#pregel) API. In
addition, GraphX includes a growing collection of graph [algorithms](#graph_algorithms) and
[builders](#graph_builders) to simplify graph analytics tasks.
-**GraphX is currently an alpha component. While we will minimize API changes, some APIs may change in future releases.**
-## Background on Graph-Parallel Computation
+## Motivation
From social networks to language modeling, the growing scale and importance of
graph data has driven the development of numerous new *graph-parallel* systems
@@ -59,9 +99,8 @@ many different table and graph views.
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
-As a consequence, it is often necessary to be able to move between table and graph views of the same
-physical data and to leverage the properties of each view to easily and efficiently express
-computation. However, existing graph analytics pipelines must compose graph-parallel and data-
+As a consequence, it is often necessary to be able to move between table and graph views.
+However, existing graph analytics pipelines must compose graph-parallel and data-
parallel systems, leading to extensive data movement and duplication and a complicated programming
model.
@@ -78,7 +117,7 @@ system with a single composable API. The GraphX API enables users to view data b
as collections (i.e., RDDs) without data movement or duplication. By incorporating recent advances
in graph-parallel systems, GraphX is able to optimize the execution of graph operations.
-## GraphX Replaces the Spark Bagel API
+<!-- ## GraphX Replaces the Spark Bagel API
Prior to the release of GraphX, graph computation in Spark was expressed using Bagel, an
implementation of Pregel. GraphX improves upon Bagel by exposing a richer property graph API, a
@@ -87,12 +126,23 @@ and reduce memory overhead. While we plan to eventually deprecate Bagel, we wil
support the [Bagel API](api/scala/index.html#org.apache.spark.bagel.package) and
[Bagel programming guide](bagel-programming-guide.html). However, we encourage Bagel users to
explore the new GraphX API and comment on issues that may complicate the transition from Bagel.
+ -->
-## Migrating from Spark 0.9.1
+## Migrating from Spark 1.1
-GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change from Spark 0.9.1. [`EdgeRDD`][EdgeRDD] may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type `Graph[VD, ED]` are of type `EdgeRDD[ED, VD]` rather than `EdgeRDD[ED]`.
+GraphX in Spark {{site.SPARK_VERSION}} contains a few user facing API changes:
-[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD
+1. To improve performance we have introduced a new version of
+[`mapReduceTriplets`][Graph.mapReduceTriplets] called
+[`aggregateMessages`][Graph.aggregateMessages] which takes the messages previously returned from
+[`mapReduceTriplets`][Graph.mapReduceTriplets] through a callback ([`EdgeContext`][EdgeContext])
+rather than by return value.
+We are deprecating [`mapReduceTriplets`][Graph.mapReduceTriplets] and encourage users to consult
+the [transition guide](#mrTripletsTransition).
+
+2. In Spark 1.0 and 1.1, the type signature of [`EdgeRDD`][EdgeRDD] switched from
+`EdgeRDD[ED]` to `EdgeRDD[ED, VD]` to enable some caching optimizations. We have since discovered
+a more elegant solution and have restored the type signature to the more natural `EdgeRDD[ED]` type.
# Getting Started
@@ -108,9 +158,10 @@ import org.apache.spark.rdd.RDD
If you are not using the Spark shell you will also need a `SparkContext`. To learn more about
getting started with Spark refer to the [Spark Quick Start Guide](quick-start.html).
-# The Property Graph
<a name="property_graph"></a>
+# The Property Graph
+
The [property graph](api/scala/index.html#org.apache.spark.graphx.Graph) is a directed multigraph
with user defined objects attached to each vertex and edge. A directed multigraph is a directed
graph with potentially multiple parallel edges sharing the same source and destination vertex. The
@@ -123,7 +174,7 @@ identifiers.
The property graph is parameterized over the vertex (`VD`) and edge (`ED`) types. These
are the types of the objects associated with each vertex and edge respectively.
-> GraphX optimizes the representation of vertex and edge types when they are plain old data-types
+> GraphX optimizes the representation of vertex and edge types when they are plain old data types
> (e.g., int, double, etc...) reducing the in memory footprint by storing them in specialized
> arrays.
@@ -142,8 +193,8 @@ var graph: Graph[VertexProperty, String] = null
Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or
structure of the graph are accomplished by producing a new graph with the desired changes. Note
that substantial parts of the original graph (i.e., unaffected structure, attributes, and indicies)
-are reused in the new graph reducing the cost of this inherently functional data-structure. The
-graph is partitioned across the executors using a range of vertex-partitioning heuristics. As with
+are reused in the new graph reducing the cost of this inherently functional data structure. The
+graph is partitioned across the executors using a range of vertex partitioning heuristics. As with
RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the
@@ -153,12 +204,12 @@ the vertices and edges of the graph:
{% highlight scala %}
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
- val edges: EdgeRDD[ED, VD]
+ val edges: EdgeRDD[ED]
}
{% endhighlight %}
-The classes `VertexRDD[VD]` and `EdgeRDD[ED, VD]` extend and are optimized versions of `RDD[(VertexID,
-VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED, VD]` provide additional
+The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
+VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional
functionality built around graph computation and leverage internal optimizations. We discuss the
`VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
@@ -211,7 +262,6 @@ In the above example we make use of the [`Edge`][Edge] case class. Edges have a
`dstId` corresponding to the source and destination vertex identifiers. In addition, the `Edge`
class has an `attr` member which stores the edge property.
-[Edge]: api/scala/index.html#org.apache.spark.graphx.Edge
We can deconstruct a graph into the respective vertex and edge views by using the `graph.vertices`
and `graph.edges` members respectively.
@@ -237,7 +287,6 @@ The triplet view logically joins the vertex and edge properties yielding an
`RDD[EdgeTriplet[VD, ED]]` containing instances of the [`EdgeTriplet`][EdgeTriplet] class. This
*join* can be expressed in the following SQL expression:
-[EdgeTriplet]: api/scala/index.html#org.apache.spark.graphx.EdgeTriplet
{% highlight sql %}
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
@@ -278,9 +327,6 @@ core operators are defined in [`GraphOps`][GraphOps]. However, thanks to Scala
operators in `GraphOps` are automatically available as members of `Graph`. For example, we can
compute the in-degree of each vertex (defined in `GraphOps`) by the following:
-[Graph]: api/scala/index.html#org.apache.spark.graphx.Graph
-[GraphOps]: api/scala/index.html#org.apache.spark.graphx.GraphOps
-
{% highlight scala %}
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
@@ -310,7 +356,7 @@ class Graph[VD, ED] {
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
- val edges: EdgeRDD[ED, VD]
+ val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
@@ -341,10 +387,10 @@ class Graph[VD, ED] {
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
- def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
- reduceFunc: (A, A) => A,
- activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
+ def aggregateMessages[Msg: ClassTag](
+ sendMsg: EdgeContext[VD, ED, Msg] => Unit,
+ mergeMsg: (Msg, Msg) => Msg,
+ tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
@@ -363,8 +409,7 @@ class Graph[VD, ED] {
## Property Operators
-In direct analogy to the RDD `map` operator, the property
-graph contains the following:
+Like the RDD `map` operator, the property graph contains the following:
{% highlight scala %}
class Graph[VD, ED] {
@@ -377,7 +422,7 @@ class Graph[VD, ED] {
Each of these operators yields a new graph with the vertex or edge properties modified by the user
defined `map` function.
-> Note that in all cases the graph structure is unaffected. This is a key feature of these operators
+> Note that in each case the graph structure is unaffected. This is a key feature of these operators
> which allows the resulting graph to reuse the structural indices of the original graph. The
> following snippets are logically equivalent, but the first one does not preserve the structural
> indices and would not benefit from the GraphX system optimizations:
@@ -390,14 +435,13 @@ val newGraph = Graph(newVertices, graph.edges)
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
{% endhighlight %}
-[Graph.mapVertices]: api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
These operators are often used to initialize the graph for a particular computation or project away
-unnecessary properties. For example, given a graph with the out-degrees as the vertex properties
+unnecessary properties. For example, given a graph with the out degrees as the vertex properties
(we describe how to construct such a graph later), we initialize it for PageRank:
{% highlight scala %}
-// Given a graph where the vertex property is the out-degree
+// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
@@ -406,9 +450,10 @@ val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
{% endhighlight %}
-## Structural Operators
<a name="structural_operators"></a>
+## Structural Operators
+
Currently GraphX supports only a simple set of commonly used structural operators and we expect to
add more in the future. The following is a list of the basic structural operators.
@@ -425,9 +470,8 @@ class Graph[VD, ED] {
The [`reverse`][Graph.reverse] operator returns a new graph with all the edge directions reversed.
This can be useful when, for example, trying to compute the inverse PageRank. Because the reverse
operation does not modify vertex or edge properties or change the number of edges, it can be
-implemented efficiently without data-movement or duplication.
+implemented efficiently without data movement or duplication.
-[Graph.reverse]: api/scala/index.html#org.apache.spark.graphx.Graph@reverse:Graph[VD,ED]
The [`subgraph`][Graph.subgraph] operator takes vertex and edge predicates and returns the graph
containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that
@@ -435,7 +479,6 @@ satisfy the edge predicate *and connect vertices that satisfy the vertex predica
operator can be used in number of situations to restrict the graph to the vertices and edges of
interest or eliminate broken links. For example in the following code we remove broken links:
-[Graph.subgraph]: api/scala/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]
{% highlight scala %}
// Create an RDD for the vertices
@@ -469,13 +512,12 @@ validGraph.triplets.map(
> Note in the above example only the vertex predicate is provided. The `subgraph` operator defaults
> to `true` if the vertex or edge predicates are not provided.
-The [`mask`][Graph.mask] operator also constructs a subgraph by returning a graph that contains the
+The [`mask`][Graph.mask] operator constructs a subgraph by returning a graph that contains the
vertices and edges that are also found in the input graph. This can be used in conjunction with the
`subgraph` operator to restrict a graph based on the properties in another related graph. For
example, we might run connected components using the graph with missing vertices and then restrict
the answer to the valid subgraph.
-[Graph.mask]: api/scala/index.html#org.apache.spark.graphx.Graph@mask[VD2,ED2](Graph[VD2,ED2])(ClassTag[VD2],ClassTag[ED2]):Graph[VD,ED]
{% highlight scala %}
// Run Connected Components
@@ -490,10 +532,9 @@ The [`groupEdges`][Graph.groupEdges] operator merges parallel edges (i.e., dupli
pairs of vertices) in the multigraph. In many numerical applications, parallel edges can be *added*
(their weights combined) into a single edge thereby reducing the size of the graph.
-[Graph.groupEdges]: api/scala/index.html#org.apache.spark.graphx.Graph@groupEdges((ED,ED)⇒ED):Graph[VD,ED]
+<a name="join_operators"></a>
## Join Operators
-<a name="join_operators"></a>
In many cases it is necessary to join data from external collections (RDDs) with graphs. For
example, we might have extra user properties that we want to merge with an existing graph or we
@@ -514,10 +555,8 @@ returns a new graph with the vertex properties obtained by applying the user def
to the result of the joined vertices. Vertices without a matching value in the RDD retain their
original value.
-[GraphOps.joinVertices]: api/scala/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
-
-> Note that if the RDD contains more than one value for a given vertex only one will be used. It
-> is therefore recommended that the input RDD be first made unique using the following which will
+> Note that if the RDD contains more than one value for a given vertex only one will be used. It
+> is therefore recommended that the input RDD be made unique using the following which will
> also *pre-index* the resulting values to substantially accelerate the subsequent join.
> {% highlight scala %}
val nonUniqueCosts: RDD[(VertexID, Double)]
@@ -533,8 +572,6 @@ property type. Because not all vertices may have a matching value in the input
function takes an `Option` type. For example, we can setup a graph for PageRank by initializing
vertex properties with their `outDegree`.
-[Graph.outerJoinVertices]: api/scala/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
-
{% highlight scala %}
val outDegrees: VertexRDD[Int] = graph.outDegrees
@@ -555,65 +592,76 @@ val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)
{% endhighlight %}
+>
+
+<a name="neighborhood-aggregation">
## Neighborhood Aggregation
-A key part of graph computation is aggregating information about the neighborhood of each vertex.
-For example we might want to know the number of followers each user has or the average age of the
+A key step in may graph analytics tasks is aggregating information about the neighborhood of each
+vertex.
+For example, we might want to know the number of followers each user has or the average age of the
the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path, and
connected components) repeatedly aggregate properties of neighboring vertices (e.g., current
PageRank Value, shortest path to the source, and smallest reachable vertex id).
-### Map Reduce Triplets (mapReduceTriplets)
-<a name="mrTriplets"></a>
+> To improve performance the primary aggregation operator changed from
+`graph.mapReduceTriplets` to the new `graph.AggregateMessages`. While the changes in the API are
+relatively small, we provide a transition guide below.
-[Graph.mapReduceTriplets]: api/scala/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexId,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
+<a name="aggregateMessages"></a>
-The core (heavily optimized) aggregation primitive in GraphX is the
-[`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
+### Aggregate Messages (aggregateMessages)
+
+The core aggregation operation in GraphX is [`aggregateMessages`][Graph.aggregateMessages].
+This operator applies a user defined `sendMsg` function to each <i>edge triplet</i> in the graph
+and then uses the `mergeMsg` function to aggregate those messages at their destination vertex.
{% highlight scala %}
class Graph[VD, ED] {
- def mapReduceTriplets[A](
- map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
- reduce: (A, A) => A)
- : VertexRDD[A]
+ def aggregateMessages[Msg: ClassTag](
+ sendMsg: EdgeContext[VD, ED, Msg] => Unit,
+ mergeMsg: (Msg, Msg) => Msg,
+ tripletFields: TripletFields = TripletFields.All)
+ : VertexRDD[Msg]
}
{% endhighlight %}
-The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which
-is applied to each triplet and can yield *messages* destined to either (none or both) vertices in
-the triplet. To facilitate optimized pre-aggregation, we currently only support messages destined
-to the source or destination vertex of the triplet. The user defined `reduce` function combines the
-messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]`
-containing the aggregate message (of type `A`) destined to each vertex. Vertices that do not
+The user defined `sendMsg` function takes an [`EdgeContext`][EdgeContext], which exposes the
+source and destination attributes along with the edge attribute and functions
+([`sendToSrc`][EdgeContext.sendToSrc], and [`sendToDst`][EdgeContext.sendToDst]) to send
+messages to the source and destination attributes. Think of `sendMsg` as the <i>map</i>
+function in map-reduce.
+The user defined `mergeMsg` function takes two messages destined to the same vertex and
+yields a single message. Think of `mergeMsg` as the <i>reduce</i> function in map-reduce.
+The [`aggregateMessages`][Graph.aggregateMessages] operator returns a `VertexRDD[Msg]`
+containing the aggregate message (of type `Msg`) destined to each vertex. Vertices that did not
receive a message are not included in the returned `VertexRDD`.
-<blockquote>
-
-<p>Note that <code>mapReduceTriplets</code> takes an additional optional <code>activeSet</code>
-(not shown above see API docs for details) which restricts the map phase to edges adjacent to the
-vertices in the provided <code>VertexRDD</code>: </p>
-
-{% highlight scala %}
- activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None
-{% endhighlight %}
-
-<p>The EdgeDirection specifies which edges adjacent to the vertex set are included in the map
-phase. If the direction is <code>In</code>, then the user defined <code>map</code> function will
-only be run only on edges with the destination vertex in the active set. If the direction is
-<code>Out</code>, then the <code>map</code> function will only be run only on edges originating from
-vertices in the active set. If the direction is <code>Either</code>, then the <code>map</code>
-function will be run only on edges with <i>either</i> vertex in the active set. If the direction is
-<code>Both</code>, then the <code>map</code> function will be run only on edges with both vertices
-in the active set. The active set must be derived from the set of vertices in the graph.
-Restricting computation to triplets adjacent to a subset of the vertices is often necessary in
-incremental iterative computation and is a key part of the GraphX implementation of Pregel. </p>
-
-</blockquote>
-
-In the following example we use the `mapReduceTriplets` operator to compute the average age of the
-more senior followers of each user.
+<!--
+> An [`EdgeContext`][EdgeContext] is provided in place of a [`EdgeTriplet`][EdgeTriplet] to
+expose the additional ([`sendToSrc`][EdgeContext.sendToSrc],
+and [`sendToDst`][EdgeContext.sendToDst]) which GraphX uses to optimize message routing.
+ -->
+
+In addition, [`aggregateMessages`][Graph.aggregateMessages] takes an optional
+`tripletsFields` which indicates what data is accessed in the [`EdgeContext`][EdgeContext]
+(i.e., the source vertex attribute but not the destination vertex attribute).
+The possible options for the `tripletsFields` are defined in [`TripletFields`][TripletFields] and
+the default value is [`TripletFields.All`][TripletFields.All] which indicates that the user
+defined `sendMsg` function may access any of the fields in the [`EdgeContext`][EdgeContext].
+The `tripletFields` argument can be used to notify GraphX that only part of the
+[`EdgeContext`][EdgeContext] will be needed allowing GraphX to select an optimized join strategy.
+For example if we are computing the average age of the followers of each user we would only require
+the source field and so we would use [`TripletFields.Src`][TripletFields.Src] to indicate that we
+only require the source field
+
+> In earlier versions of GraphX we used byte code inspection to infer the
+[`TripletFields`][TripletFields] however we have found that bytecode inspection to be
+slightly unreliable and instead opted for more explicit user control.
+
+In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator to
+compute the average age of the more senior followers of each user.
{% highlight scala %}
// Import random graph generation library
@@ -622,14 +670,11 @@ import org.apache.spark.graphx.util.GraphGenerators
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
-val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
+val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
- Iterator((triplet.dstId, (1, triplet.srcAttr)))
- } else {
- // Don't send a message for this triplet
- Iterator.empty
+ triplet.sendToDst(1, triplet.srcAttr)
}
},
// Add counter and age
@@ -642,10 +687,57 @@ val avgAgeOfOlderFollowers: VertexRDD[Double] =
avgAgeOfOlderFollowers.collect.foreach(println(_))
{% endhighlight %}
-> Note that the `mapReduceTriplets` operation performs optimally when the messages (and the sums of
-> messages) are constant sized (e.g., floats and addition instead of lists and concatenation). More
-> precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each
-> vertex.
+> The `aggregateMessages` operation performs optimally when the messages (and the sums of
+> messages) are constant sized (e.g., floats and addition instead of lists and concatenation).
+
+<a name="mrTripletsTransition"></a>
+
+### Map Reduce Triplets Transition Guide (Legacy)
+
+In earlier versions of GraphX we neighborhood aggregation was accomplished using the
+[`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
+
+{% highlight scala %}
+class Graph[VD, ED] {
+ def mapReduceTriplets[Msg](
+ map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
+ reduce: (Msg, Msg) => Msg)
+ : VertexRDD[Msg]
+}
+{% endhighlight %}
+
+The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which
+is applied to each triplet and can yield *messages* which are aggregated using the user defined
+`reduce` function.
+However, we found the user of the returned iterator to be expensive and it inhibited our ability to
+apply additional optimizations (e.g., local vertex renumbering).
+In [`aggregateMessages`][Graph.aggregateMessages] we introduced the EdgeContext which exposes the
+triplet fields and also functions to explicitly send messages to the source and destination vertex.
+Furthermore we removed bytecode inspection and instead require the user to indicate what fields
+in the triplet are actually required.
+
+The following code block using `mapReduceTriplets`:
+
+{% highlight scala %}
+val graph: Graph[Int, Float] = ...
+def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
+ Iterator((triplet.dstId, "Hi"))
+}
+def reduceFun(a: Int, b: Int): Int = a + b
+val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
+{% endhighlight %}
+
+can be rewritten using `aggregateMessages` as:
+
+{% highlight scala %}
+val graph: Graph[Int, Float] = ...
+def msgFun(triplet: EdgeContext[Int, Float, String]) {
+ triplet.sendToDst("Hi")
+}
+def reduceFun(a: Int, b: Int): Int = a + b
+val result = graph.aggregateMessages[String](msgFun, reduceFun)
+{% endhighlight %}
+
### Computing Degree Information
@@ -673,10 +765,6 @@ attributes at each vertex. This can be easily accomplished using the
[`collectNeighborIds`][GraphOps.collectNeighborIds] and the
[`collectNeighbors`][GraphOps.collectNeighbors] operators.
-[GraphOps.collectNeighborIds]: api/scala/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]
-[GraphOps.collectNeighbors]: api/scala/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]
-
-
{% highlight scala %}
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
@@ -684,36 +772,35 @@ class GraphOps[VD, ED] {
}
{% endhighlight %}
-> Note that these operators can be quite costly as they duplicate information and require
+> These operators can be quite costly as they duplicate information and require
> substantial communication. If possible try expressing the same computation using the
-> `mapReduceTriplets` operator directly.
+> [`aggregateMessages`][Graph.aggregateMessages] operator directly.
## Caching and Uncaching
In Spark, RDDs are not persisted in memory by default. To avoid recomputation, they must be explicitly cached when using them multiple times (see the [Spark Programming Guide][RDD Persistence]). Graphs in GraphX behave the same way. **When using a graph multiple times, make sure to call [`Graph.cache()`][Graph.cache] on it first.**
-[RDD Persistence]: programming-guide.html#rdd-persistence
-[Graph.cache]: api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]
In iterative computations, *uncaching* may also be necessary for best performance. By default, cached RDDs and graphs will remain in memory until memory pressure forces them to be evicted in LRU order. For iterative computation, intermediate results from previous iterations will fill up the cache. Though they will eventually be evicted, the unnecessary data stored in memory will slow down garbage collection. It would be more efficient to uncache intermediate results as soon as they are no longer necessary. This involves materializing (caching and forcing) a graph or RDD every iteration, uncaching all other datasets, and only using the materialized dataset in future iterations. However, because graphs are composed of multiple RDDs, it can be difficult to unpersist them correctly. **For iterative computation we recommend using the Pregel API, which correctly unpersists intermediate results.**
-# Pregel API
<a name="pregel"></a>
-Graphs are inherently recursive data-structures as properties of vertices depend on properties of
+# Pregel API
+
+Graphs are inherently recursive data structures as properties of vertices depend on properties of
their neighbors which in turn depend on properties of *their* neighbors. As a
consequence many important graph algorithms iteratively recompute the properties of each vertex
until a fixed-point condition is reached. A range of graph-parallel abstractions have been proposed
to express these iterative algorithms. GraphX exposes a Pregel-like operator which is a fusion of
the widely used Pregel and GraphLab abstractions.
-At a high-level the Pregel operator in GraphX is a bulk-synchronous parallel messaging abstraction
-*constrained to the topology of the graph*. The Pregel operator executes in a series of super-steps
-in which vertices receive the *sum* of their inbound messages from the previous super- step, compute
+At a high level the Pregel operator in GraphX is a bulk-synchronous parallel messaging abstraction
+*constrained to the topology of the graph*. The Pregel operator executes in a series of super steps
+in which vertices receive the *sum* of their inbound messages from the previous super step, compute
a new value for the vertex property, and then send messages to neighboring vertices in the next
-super-step. Unlike Pregel and instead more like GraphLab messages are computed in parallel as a
+super step. Unlike Pregel and instead more like GraphLab messages are computed in parallel as a
function of the edge triplet and the message computation has access to both the source and
-destination vertex attributes. Vertices that do not receive a message are skipped within a super-
+destination vertex attributes. Vertices that do not receive a message are skipped within a super
step. The Pregel operators terminates iteration and returns the final graph when there are no
messages remaining.
@@ -724,8 +811,6 @@ messages remaining.
The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
of its implementation (note calls to graph.cache have been removed):
-[GraphOps.pregel]: api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
-
{% highlight scala %}
class GraphOps[VD, ED] {
def pregel[A]
@@ -795,9 +880,10 @@ val sssp = initialGraph.pregel(Double.PositiveInfinity)(
println(sssp.vertices.collect.mkString("\n"))
{% endhighlight %}
-# Graph Builders
<a name="graph_builders"></a>
+# Graph Builders
+
GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph's edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). [`Graph.groupEdges`][Graph.groupEdges] requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call [`Graph.partitionBy`][Graph.partitionBy] before calling `groupEdges`.
{% highlight scala %}
@@ -848,18 +934,12 @@ object Graph {
[`Graph.fromEdgeTuples`][Graph.fromEdgeTuples] allows creating a graph from only an RDD of edge tuples, assigning the edges the value 1, and automatically creating any vertices mentioned by edges and assigning them the default value. It also supports deduplicating the edges; to deduplicate, pass `Some` of a [`PartitionStrategy`][PartitionStrategy] as the `uniqueEdges` parameter (for example, `uniqueEdges = Some(PartitionStrategy.RandomVertexCut)`). A partition strategy is necessary to colocate identical edges on the same partition so they can be deduplicated.
-[PartitionStrategy]: api/scala/index.html#org.apache.spark.graphx.PartitionStrategy$
-
-[GraphLoader.edgeListFile]: api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]
-[Graph.apply]: api/scala/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
-[Graph.fromEdgeTuples]: api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
-[Graph.fromEdges]: api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
+<a name="vertex_and_edge_rdds"></a>
# Vertex and Edge RDDs
-<a name="vertex_and_edge_rdds"></a>
GraphX exposes `RDD` views of the vertices and edges stored within the graph. However, because
-GraphX maintains the vertices and edges in optimized data-structures and these data-structures
+GraphX maintains the vertices and edges in optimized data structures and these data structures
provide additional functionality, the vertices and edges are returned as `VertexRDD` and `EdgeRDD`
respectively. In this section we review some of the additional useful functionality in these types.
@@ -870,7 +950,7 @@ The `VertexRDD[A]` extends `RDD[(VertexID, A)]` and adds the additional constrai
attribute of type `A`. Internally, this is achieved by storing the vertex attributes in a reusable
hash-map data-structure. As a consequence if two `VertexRDD`s are derived from the same base
`VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant time without hash
-evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the following
+evaluations. To leverage this indexed data structure, the `VertexRDD` exposes the following
additional functionality:
{% highlight scala %}
@@ -893,7 +973,7 @@ class VertexRDD[VD] extends RDD[(VertexID, VD)] {
Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually
implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins
with other `VertexRDD`s. Likewise, the `mapValues` operators do not allow the `map` function to
-change the `VertexID` thereby enabling the same `HashMap` data-structures to be reused. Both the
+change the `VertexID` thereby enabling the same `HashMap` data structures to be reused. Both the
`leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same
`HashMap` and implement the join by linear scan rather than costly point lookups.
@@ -916,21 +996,19 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
## EdgeRDDs
-The `EdgeRDD[ED, VD]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
+The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy]. Within
each partition, edge attributes and adjacency structure, are stored separately enabling maximum
reuse when changing attribute values.
-[PartitionStrategy]: api/scala/index.html#org.apache.spark.graphx.PartitionStrategy
-
The three additional functions exposed by the `EdgeRDD` are:
{% highlight scala %}
// Transform the edge attributes while preserving the structure
-def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
+def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
-def reverse: EdgeRDD[ED, VD]
+def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
-def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
+def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
{% endhighlight %}
In most applications we have found that operations on the `EdgeRDD` are accomplished through the
@@ -960,7 +1038,6 @@ the [`Graph.partitionBy`][Graph.partitionBy] operator. The default partitioning
the initial partitioning of the edges as provided on graph construction. However, users can easily
switch to 2D-partitioning or other heuristics included in GraphX.
-[Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED]
<p style="text-align: center;">
<img src="img/vertex_routing_edge_tables.png"
@@ -975,24 +1052,24 @@ efficiently joining vertex attributes with the edges. Because real-world graphs
edges than vertices, we move vertex attributes to the edges. Because not all partitions will
contain edges adjacent to all vertices we internally maintain a routing table which identifies where
to broadcast vertices when implementing the join required for operations like `triplets` and
-`mapReduceTriplets`.
+`aggregateMessages`.
-# Graph Algorithms
<a name="graph_algorithms"></a>
+# Graph Algorithms
+
GraphX includes a set of graph algorithms to simplify analytics tasks. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used.
-## PageRank
<a name="pagerank"></a>
+## PageRank
+
PageRank measures the importance of each vertex in a graph, assuming an edge from *u* to *v* represents an endorsement of *v*'s importance by *u*. For example, if a Twitter user is followed by many others, the user will be ranked highly.
GraphX comes with static and dynamic implementations of PageRank as methods on the [`PageRank` object][PageRank]. Static PageRank runs for a fixed number of iterations, while dynamic PageRank runs until the ranks converge (i.e., stop changing by more than a specified tolerance). [`GraphOps`][GraphOps] allows calling these algorithms directly as methods on `Graph`.
GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in `graphx/data/users.txt`, and a set of relationships between users is given in `graphx/data/followers.txt`. We compute the PageRank of each user as follows:
-[PageRank]: api/scala/index.html#org.apache.spark.graphx.lib.PageRank$
-
{% highlight scala %}
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
@@ -1014,8 +1091,6 @@ println(ranksByUsername.collect().mkString("\n"))
The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents` object][ConnectedComponents], and we compute the connected components of the example social network dataset from the [PageRank section](#pagerank) as follows:
-[ConnectedComponents]: api/scala/index.html#org.apache.spark.graphx.lib.ConnectedComponents$
-
{% highlight scala %}
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
@@ -1037,9 +1112,6 @@ println(ccByUsername.collect().mkString("\n"))
A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
-[TriangleCount]: api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$
-[Graph.partitionBy]: api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED]
-
{% highlight scala %}
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)