aboutsummaryrefslogtreecommitdiff
path: root/docs/graphx-programming-guide.md
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-07-02 16:29:00 +0100
committerSean Owen <sowen@cloudera.com>2016-07-02 16:29:00 +0100
commit0bd7cd18bc4d535b0c4499913f6747b3f6315ac2 (patch)
tree6c966df99d61193b0c5a34af9f0185d942143721 /docs/graphx-programming-guide.md
parent192d1f9cf3463d050b87422939448f2acf86acc9 (diff)
downloadspark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.tar.gz
spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.tar.bz2
spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.zip
[SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets from source files instead of hard code them
## What changes were proposed in this pull request? I extract 6 example programs from GraphX programming guide and replace them with `include_example` label. The 6 example programs are: - AggregateMessagesExample.scala - SSSPExample.scala - TriangleCountingExample.scala - ConnectedComponentsExample.scala - ComprehensiveExample.scala - PageRankExample.scala All the example code can run using `bin/run-example graphx.EXAMPLE_NAME` ## How was this patch tested? Manual. Author: WeichenXu <WeichenXu123@outlook.com> Closes #14015 from WeichenXu123/graphx_example_plugin.
Diffstat (limited to 'docs/graphx-programming-guide.md')
-rw-r--r--docs/graphx-programming-guide.md133
1 files changed, 6 insertions, 127 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index e376b6638e..2e9966c0a2 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -603,29 +603,7 @@ slightly unreliable and instead opted for more explicit user control.
In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator to
compute the average age of the more senior followers of each user.
-{% highlight scala %}
-// Import random graph generation library
-import org.apache.spark.graphx.util.GraphGenerators
-// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
-val graph: Graph[Double, Int] =
- GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
-// Compute the number of older followers and their total age
-val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
- triplet => { // Map Function
- if (triplet.srcAttr > triplet.dstAttr) {
- // Send message to destination vertex containing counter and age
- triplet.sendToDst(1, triplet.srcAttr)
- }
- },
- // Add counter and age
- (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
-)
-// Divide total age by number of older followers to get average age of older followers
-val avgAgeOfOlderFollowers: VertexRDD[Double] =
- olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
-// Display the results
-avgAgeOfOlderFollowers.collect.foreach(println(_))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala %}
> The `aggregateMessages` operation performs optimally when the messages (and the sums of
> messages) are constant sized (e.g., floats and addition instead of lists and concatenation).
@@ -793,29 +771,7 @@ second argument list contains the user defined functions for receiving messages
We can use the Pregel operator to express computation such as single source
shortest path in the following example.
-{% highlight scala %}
-import org.apache.spark.graphx._
-// Import random graph generation library
-import org.apache.spark.graphx.util.GraphGenerators
-// A graph with edge attributes containing distances
-val graph: Graph[Long, Double] =
- GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
-val sourceId: VertexId = 42 // The ultimate source
-// Initialize the graph such that all vertices except the root have distance infinity.
-val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
-val sssp = initialGraph.pregel(Double.PositiveInfinity)(
- (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
- triplet => { // Send Message
- if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
- Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
- } else {
- Iterator.empty
- }
- },
- (a,b) => math.min(a,b) // Merge Message
- )
-println(sssp.vertices.collect.mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/SSSPExample.scala %}
<a name="graph_builders"></a>
@@ -1009,64 +965,19 @@ GraphX comes with static and dynamic implementations of PageRank as methods on t
GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in `data/graphx/users.txt`, and a set of relationships between users is given in `data/graphx/followers.txt`. We compute the PageRank of each user as follows:
-{% highlight scala %}
-// Load the edges as a graph
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-// Run PageRank
-val ranks = graph.pageRank(0.0001).vertices
-// Join the ranks with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
- val fields = line.split(",")
- (fields(0).toLong, fields(1))
-}
-val ranksByUsername = users.join(ranks).map {
- case (id, (username, rank)) => (username, rank)
-}
-// Print the result
-println(ranksByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/PageRankExample.scala %}
## Connected Components
The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents` object][ConnectedComponents], and we compute the connected components of the example social network dataset from the [PageRank section](#pagerank) as follows:
-{% highlight scala %}
-// Load the graph as in the PageRank example
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-// Find the connected components
-val cc = graph.connectedComponents().vertices
-// Join the connected components with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
- val fields = line.split(",")
- (fields(0).toLong, fields(1))
-}
-val ccByUsername = users.join(cc).map {
- case (id, (username, cc)) => (username, cc)
-}
-// Print the result
-println(ccByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala %}
## Triangle Counting
A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
-{% highlight scala %}
-// Load the edges in canonical order and partition the graph for triangle count
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
-// Find the triangle count for each vertex
-val triCounts = graph.triangleCount().vertices
-// Join the triangle counts with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
- val fields = line.split(",")
- (fields(0).toLong, fields(1))
-}
-val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
- (username, tc)
-}
-// Print the result
-println(triCountByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala %}
# Examples
@@ -1076,36 +987,4 @@ to important relationships and users, run page-rank on the sub-graph, and
then finally return attributes associated with the top users. I can do
all of this in just a few lines with GraphX:
-{% highlight scala %}
-// Connect to the Spark cluster
-val sc = new SparkContext("spark://master.amplab.org", "research")
-
-// Load my user data and parse into tuples of user id and attribute list
-val users = (sc.textFile("data/graphx/users.txt")
- .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
-
-// Parse the edge data which is already in userId -> userId format
-val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-
-// Attach the user attributes
-val graph = followerGraph.outerJoinVertices(users) {
- case (uid, deg, Some(attrList)) => attrList
- // Some users may not have attributes so we set them as empty
- case (uid, deg, None) => Array.empty[String]
-}
-
-// Restrict the graph to users with usernames and names
-val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
-
-// Compute the PageRank
-val pagerankGraph = subgraph.pageRank(0.001)
-
-// Get the attributes of the top pagerank users
-val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
- case (uid, attrList, Some(pr)) => (pr, attrList.toList)
- case (uid, attrList, None) => (0.0, attrList.toList)
-}
-
-println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
-
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala %}