diff options
Diffstat (limited to 'docs/graphx-programming-guide.md')
-rw-r--r-- | docs/graphx-programming-guide.md | 133 |
1 files changed, 6 insertions, 127 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index e376b6638e..2e9966c0a2 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -603,29 +603,7 @@ slightly unreliable and instead opted for more explicit user control. In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator to compute the average age of the more senior followers of each user. -{% highlight scala %} -// Import random graph generation library -import org.apache.spark.graphx.util.GraphGenerators -// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity. -val graph: Graph[Double, Int] = - GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble ) -// Compute the number of older followers and their total age -val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)]( - triplet => { // Map Function - if (triplet.srcAttr > triplet.dstAttr) { - // Send message to destination vertex containing counter and age - triplet.sendToDst(1, triplet.srcAttr) - } - }, - // Add counter and age - (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function -) -// Divide total age by number of older followers to get average age of older followers -val avgAgeOfOlderFollowers: VertexRDD[Double] = - olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ) -// Display the results -avgAgeOfOlderFollowers.collect.foreach(println(_)) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala %} > The `aggregateMessages` operation performs optimally when the messages (and the sums of > messages) are constant sized (e.g., floats and addition instead of lists and concatenation). @@ -793,29 +771,7 @@ second argument list contains the user defined functions for receiving messages We can use the Pregel operator to express computation such as single source shortest path in the following example. -{% highlight scala %} -import org.apache.spark.graphx._ -// Import random graph generation library -import org.apache.spark.graphx.util.GraphGenerators -// A graph with edge attributes containing distances -val graph: Graph[Long, Double] = - GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble) -val sourceId: VertexId = 42 // The ultimate source -// Initialize the graph such that all vertices except the root have distance infinity. -val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) -val sssp = initialGraph.pregel(Double.PositiveInfinity)( - (id, dist, newDist) => math.min(dist, newDist), // Vertex Program - triplet => { // Send Message - if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { - Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) - } else { - Iterator.empty - } - }, - (a,b) => math.min(a,b) // Merge Message - ) -println(sssp.vertices.collect.mkString("\n")) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/graphx/SSSPExample.scala %} <a name="graph_builders"></a> @@ -1009,64 +965,19 @@ GraphX comes with static and dynamic implementations of PageRank as methods on t GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in `data/graphx/users.txt`, and a set of relationships between users is given in `data/graphx/followers.txt`. We compute the PageRank of each user as follows: -{% highlight scala %} -// Load the edges as a graph -val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") -// Run PageRank -val ranks = graph.pageRank(0.0001).vertices -// Join the ranks with the usernames -val users = sc.textFile("data/graphx/users.txt").map { line => - val fields = line.split(",") - (fields(0).toLong, fields(1)) -} -val ranksByUsername = users.join(ranks).map { - case (id, (username, rank)) => (username, rank) -} -// Print the result -println(ranksByUsername.collect().mkString("\n")) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/graphx/PageRankExample.scala %} ## Connected Components The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents` object][ConnectedComponents], and we compute the connected components of the example social network dataset from the [PageRank section](#pagerank) as follows: -{% highlight scala %} -// Load the graph as in the PageRank example -val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") -// Find the connected components -val cc = graph.connectedComponents().vertices -// Join the connected components with the usernames -val users = sc.textFile("data/graphx/users.txt").map { line => - val fields = line.split(",") - (fields(0).toLong, fields(1)) -} -val ccByUsername = users.join(cc).map { - case (id, (username, cc)) => (username, cc) -} -// Print the result -println(ccByUsername.collect().mkString("\n")) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala %} ## Triangle Counting A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].* -{% highlight scala %} -// Load the edges in canonical order and partition the graph for triangle count -val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut) -// Find the triangle count for each vertex -val triCounts = graph.triangleCount().vertices -// Join the triangle counts with the usernames -val users = sc.textFile("data/graphx/users.txt").map { line => - val fields = line.split(",") - (fields(0).toLong, fields(1)) -} -val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) => - (username, tc) -} -// Print the result -println(triCountByUsername.collect().mkString("\n")) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala %} # Examples @@ -1076,36 +987,4 @@ to important relationships and users, run page-rank on the sub-graph, and then finally return attributes associated with the top users. I can do all of this in just a few lines with GraphX: -{% highlight scala %} -// Connect to the Spark cluster -val sc = new SparkContext("spark://master.amplab.org", "research") - -// Load my user data and parse into tuples of user id and attribute list -val users = (sc.textFile("data/graphx/users.txt") - .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) )) - -// Parse the edge data which is already in userId -> userId format -val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") - -// Attach the user attributes -val graph = followerGraph.outerJoinVertices(users) { - case (uid, deg, Some(attrList)) => attrList - // Some users may not have attributes so we set them as empty - case (uid, deg, None) => Array.empty[String] -} - -// Restrict the graph to users with usernames and names -val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2) - -// Compute the PageRank -val pagerankGraph = subgraph.pageRank(0.001) - -// Get the attributes of the top pagerank users -val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) { - case (uid, attrList, Some(pr)) => (pr, attrList.toList) - case (uid, attrList, None) => (0.0, attrList.toList) -} - -println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) - -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala %} |