aboutsummaryrefslogtreecommitdiff
path: root/docs/graphx-programming-guide.md
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-13 21:55:35 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-13 21:55:35 -0800
commit6f6f8c928ce493357d4d32e46971c5e401682ea8 (patch)
tree01fda55c13b53591700704889a148a9ed0c259b7 /docs/graphx-programming-guide.md
parent67795dbbfb3857e9677e3104b8bd6fd2cd5633a9 (diff)
downloadspark-6f6f8c928ce493357d4d32e46971c5e401682ea8.tar.gz
spark-6f6f8c928ce493357d4d32e46971c5e401682ea8.tar.bz2
spark-6f6f8c928ce493357d4d32e46971c5e401682ea8.zip
Wrap methods in the appropriate class/object declaration
Diffstat (limited to 'docs/graphx-programming-guide.md')
-rw-r--r--docs/graphx-programming-guide.md149
1 files changed, 85 insertions, 64 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index aadeb38960..29d397c371 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -256,7 +256,7 @@ compute the in-degree of each vertex (defined in `GraphOps`) by the following:
{% highlight scala %}
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
-val indDegrees: VertexRDD[Int] = graph.inDegrees
+val inDegrees: VertexRDD[Int] = graph.inDegrees
{% endhighlight %}
The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be
@@ -270,9 +270,11 @@ In direct analogy to the RDD `map` operator, the property
graph contains the following:
{% highlight scala %}
-def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
-def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
-def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
+class Graph[VD, ED] {
+ def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
+ def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
+ def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
+}
{% endhighlight %}
Each of these operators yields a new graph with the vertex or edge properties modified by the user
@@ -314,11 +316,13 @@ Currently GraphX supports only a simple set of commonly used structural operator
add more in the future. The following is a list of the basic structural operators.
{% highlight scala %}
-def reverse: Graph[VD, ED]
-def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
- vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
-def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
-def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
+class Graph[VD, ED] {
+ def reverse: Graph[VD, ED]
+ def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
+ vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
+ def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
+ def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
+}
{% endhighlight %}
The [`reverse`][Graph.reverse] operator returns a new graph with all the edge directions reversed.
@@ -400,10 +404,12 @@ might want to pull vertex properties from one graph into another. These tasks c
using the *join* operators. Below we list the key join operators:
{% highlight scala %}
-def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
- : Graph[VD, ED]
-def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
- : Graph[VD2, ED]
+class Graph[VD, ED] {
+ def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
+ : Graph[VD, ED]
+ def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
+ : Graph[VD2, ED]
+}
{% endhighlight %}
The [`joinVertices`][GraphOps.joinVertices] operator joins the vertices with the input RDD and
@@ -470,10 +476,12 @@ The core (heavily optimized) aggregation primitive in GraphX is the
[`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
{% highlight scala %}
-def mapReduceTriplets[A](
- map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
- reduce: (A, A) => A)
- : VertexRDD[A]
+class Graph[VD, ED] {
+ def mapReduceTriplets[A](
+ map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ reduce: (A, A) => A)
+ : VertexRDD[A]
+}
{% endhighlight %}
The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which
@@ -564,12 +572,19 @@ val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max)
### Collecting Neighbors
In some cases it may be easier to express computation by collecting neighboring vertices and their
-attributes at each vertex. This can be easily accomplished using the `collectNeighborIds` and the
-`collectNeighbors` operators.
+attributes at each vertex. This can be easily accomplished using the
+[`collectNeighborIds`][GraphOps.collectNeighborIds] and the
+[`collectNeighbors`][GraphOps.collectNeighbors] operators.
+
+[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]]
+[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]]
+
{% highlight scala %}
-def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] =
-def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+class GraphOps[VD, ED] {
+ def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
+ def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+}
{% endhighlight %}
> Note that these operators can be quite costly as they duplicate information and require
@@ -600,40 +615,44 @@ messages remaining.
> neighboring vertices and the message construction is done in parallel using a user defined
> messaging function. These constraints allow additional optimization within GraphX.
-The following is type signature of the Pregel operator as well as a *sketch* of its implementation
-(note calls to graph.cache have been removed):
+The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
+of its implementation (note calls to graph.cache have been removed):
+
+[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
{% highlight scala %}
-def pregel[A]
- (initialMsg: A,
- maxIter: Int = Int.MaxValue,
- activeDir: EdgeDirection = EdgeDirection.Out)
- (vprog: (VertexID, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
- mergeMsg: (A, A) => A)
- : Graph[VD, ED] = {
- // Receive the initial message at each vertex
- var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
- // compute the messages
- var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
- var activeMessages = messages.count()
- // Loop until no messages remain or maxIterations is achieved
- var i = 0
- while (activeMessages > 0 && i < maxIterations) {
- // Receive the messages: -----------------------------------------------------------------------
- // Run the vertex program on all vertices that receive messages
- val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
- // Merge the new vertex values back into the graph
- g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
- // Send Messages: ------------------------------------------------------------------------------
- // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
- // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
- // on edges in the activeDir of vertices in newVerts
- messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
- activeMessages = messages.count()
- i += 1
+class GraphOps[VD, ED] {
+ def pregel[A]
+ (initialMsg: A,
+ maxIter: Int = Int.MaxValue,
+ activeDir: EdgeDirection = EdgeDirection.Out)
+ (vprog: (VertexID, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ mergeMsg: (A, A) => A)
+ : Graph[VD, ED] = {
+ // Receive the initial message at each vertex
+ var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
+ // compute the messages
+ var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
+ var activeMessages = messages.count()
+ // Loop until no messages remain or maxIterations is achieved
+ var i = 0
+ while (activeMessages > 0 && i < maxIterations) {
+ // Receive the messages: -----------------------------------------------------------------------
+ // Run the vertex program on all vertices that receive messages
+ val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+ // Merge the new vertex values back into the graph
+ g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
+ // Send Messages: ------------------------------------------------------------------------------
+ // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
+ // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
+ // on edges in the activeDir of vertices in newVerts
+ messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
+ activeMessages = messages.count()
+ i += 1
+ }
+ g
}
- g
}
{% endhighlight %}
@@ -749,18 +768,20 @@ time without hash evaluations. To leverage this indexed data-structure, the `Ver
following additional functionality:
{% highlight scala %}
-// Filter the vertex set but preserves the internal index
-def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
-// Transform the values without changing the ids (preserves the internal index)
-def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
-def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
-// Remove vertices from this set that appear in the other set
-def diff(other: VertexRDD[VD]): VertexRDD[VD]
-// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
-def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
-def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
-// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
-def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
+class VertexRDD[VD] {
+ // Filter the vertex set but preserves the internal index
+ def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
+ // Transform the values without changing the ids (preserves the internal index)
+ def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
+ def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
+ // Remove vertices from this set that appear in the other set
+ def diff(other: VertexRDD[VD]): VertexRDD[VD]
+ // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
+ def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
+ def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
+ // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
+ def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
+}
{% endhighlight %}
Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually