From ee8931d2c6503716de640d6d1249c515e1fd85d3 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 13 Jan 2014 19:30:25 -0800 Subject: Finished documenting vertexrdd. --- docs/graphx-programming-guide.md | 53 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) (limited to 'docs/graphx-programming-guide.md') diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 77d807874f..76de26c7cd 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -683,7 +683,60 @@ val sssp = initialGraph.pregel(Double.PositiveInfinity)( # Vertex and Edge RDDs +GraphX exposes `RDD` views of the vertices and edges stored within the graph. However, because +GraphX maintains the vertices and edges in optimized data-structures and these data-structures +provide additional functionality, the vertices and edges are returned as `VertexRDD` and `EdgeRDD` +respectively. In this section we review some of the additional useful functionality in these types. +## VertexRDDs + +The `VertexRDD[A]` extends the more traditional `RDD[(VertexId, A)]` but adds the additional +constraint that each `VertexId` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of +vertices each with an attribute of type `A`. Internally, this is achieved by storing the vertex +attributes in a reusable hash-map data-structure. As a consequence if two `VertexRDD`s are derived +from the same base `VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant +time without hash evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the +following additional functionality: + +{% highlight scala %} +// Filter the vertex set but preserves the internal index +def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] +// Transform the values without changing the ids (preserves the internal index) +def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] +def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2] +// Remove vertices from this set that appear in the other set +def diff(other: VertexRDD[VD]): VertexRDD[VD] +// Join operators that take advantage of the internal indexing to accelerate joins (substantially) +def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] +def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] +// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. +def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] +{% endhighlight %} + +Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually +implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins +with other `VertexRDD`s. Likewise, the `mapValues` operators do not allow the `map` function to +change the `VertexId` thereby enabling the same `HashMap` data-structures to be reused. Both the +`leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same +`HashMap` and implement the join by linear scan rather than costly point lookups. + +The `aggregateUsingIndex` operator can be slightly confusing but is also useful for efficient +construction of a new `VertexRDD` from an `RDD[(VertexId, A)]`. Conceptually, if I have constructed +a `VertexRDD[B]` over a set of vertices, *which is a super-set* of the vertices in some +`RDD[(VertexId, A)]` then I can reuse the index to both aggregate and then subsequently index the +RDD. For example: + +{% highlight scala %} +val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1))) +val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) +// There should be 200 entries in rddB +rddB.count +val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) +// There should be 100 entries in setB +setB.count +// Joining A and B should now be fast! +val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b) +{% endhighlight %} # Optimized Representation -- cgit v1.2.3