aboutsummaryrefslogtreecommitdiff
path: root/docs/graphx-programming-guide.md
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-01-13 19:30:25 -0800
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-01-13 19:30:35 -0800
commitee8931d2c6503716de640d6d1249c515e1fd85d3 (patch)
tree08d361fb2d8a486c4f045213c904b20d12eef0dd /docs/graphx-programming-guide.md
parent0fbc0b056179ec85b86d7ad6ddcd379bf1bf194d (diff)
downloadspark-ee8931d2c6503716de640d6d1249c515e1fd85d3.tar.gz
spark-ee8931d2c6503716de640d6d1249c515e1fd85d3.tar.bz2
spark-ee8931d2c6503716de640d6d1249c515e1fd85d3.zip
Finished documenting vertexrdd.
Diffstat (limited to 'docs/graphx-programming-guide.md')
-rw-r--r--docs/graphx-programming-guide.md53
1 files changed, 53 insertions, 0 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 77d807874f..76de26c7cd 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -683,7 +683,60 @@ val sssp = initialGraph.pregel(Double.PositiveInfinity)(
# Vertex and Edge RDDs
<a name="vertex_and_edge_rdds"></a>
+GraphX exposes `RDD` views of the vertices and edges stored within the graph. However, because
+GraphX maintains the vertices and edges in optimized data-structures and these data-structures
+provide additional functionality, the vertices and edges are returned as `VertexRDD` and `EdgeRDD`
+respectively. In this section we review some of the additional useful functionality in these types.
+## VertexRDDs
+
+The `VertexRDD[A]` extends the more traditional `RDD[(VertexId, A)]` but adds the additional
+constraint that each `VertexId` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of
+vertices each with an attribute of type `A`. Internally, this is achieved by storing the vertex
+attributes in a reusable hash-map data-structure. As a consequence if two `VertexRDD`s are derived
+from the same base `VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant
+time without hash evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the
+following additional functionality:
+
+{% highlight scala %}
+// Filter the vertex set but preserves the internal index
+def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
+// Transform the values without changing the ids (preserves the internal index)
+def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
+def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
+// Remove vertices from this set that appear in the other set
+def diff(other: VertexRDD[VD]): VertexRDD[VD]
+// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
+def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
+def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
+// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
+def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
+{% endhighlight %}
+
+Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually
+implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins
+with other `VertexRDD`s. Likewise, the `mapValues` operators do not allow the `map` function to
+change the `VertexId` thereby enabling the same `HashMap` data-structures to be reused. Both the
+`leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same
+`HashMap` and implement the join by linear scan rather than costly point lookups.
+
+The `aggregateUsingIndex` operator can be slightly confusing but is also useful for efficient
+construction of a new `VertexRDD` from an `RDD[(VertexId, A)]`. Conceptually, if I have constructed
+a `VertexRDD[B]` over a set of vertices, *which is a super-set* of the vertices in some
+`RDD[(VertexId, A)]` then I can reuse the index to both aggregate and then subsequently index the
+RDD. For example:
+
+{% highlight scala %}
+val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
+val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
+// There should be 200 entries in rddB
+rddB.count
+val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
+// There should be 100 entries in setB
+setB.count
+// Joining A and B should now be fast!
+val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
+{% endhighlight %}
# Optimized Representation