aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-07-02 16:29:00 +0100
committerSean Owen <sowen@cloudera.com>2016-07-02 16:29:00 +0100
commit0bd7cd18bc4d535b0c4499913f6747b3f6315ac2 (patch)
tree6c966df99d61193b0c5a34af9f0185d942143721
parent192d1f9cf3463d050b87422939448f2acf86acc9 (diff)
downloadspark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.tar.gz
spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.tar.bz2
spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.zip
[SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets from source files instead of hard code them
## What changes were proposed in this pull request? I extract 6 example programs from GraphX programming guide and replace them with `include_example` label. The 6 example programs are: - AggregateMessagesExample.scala - SSSPExample.scala - TriangleCountingExample.scala - ConnectedComponentsExample.scala - ComprehensiveExample.scala - PageRankExample.scala All the example code can run using `bin/run-example graphx.EXAMPLE_NAME` ## How was this patch tested? Manual. Author: WeichenXu <WeichenXu123@outlook.com> Closes #14015 from WeichenXu123/graphx_example_plugin.
-rw-r--r--docs/graphx-programming-guide.md133
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala72
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala80
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala68
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala61
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala69
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala70
7 files changed, 426 insertions, 127 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index e376b6638e..2e9966c0a2 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -603,29 +603,7 @@ slightly unreliable and instead opted for more explicit user control.
In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator to
compute the average age of the more senior followers of each user.
-{% highlight scala %}
-// Import random graph generation library
-import org.apache.spark.graphx.util.GraphGenerators
-// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
-val graph: Graph[Double, Int] =
- GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
-// Compute the number of older followers and their total age
-val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
- triplet => { // Map Function
- if (triplet.srcAttr > triplet.dstAttr) {
- // Send message to destination vertex containing counter and age
- triplet.sendToDst(1, triplet.srcAttr)
- }
- },
- // Add counter and age
- (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
-)
-// Divide total age by number of older followers to get average age of older followers
-val avgAgeOfOlderFollowers: VertexRDD[Double] =
- olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
-// Display the results
-avgAgeOfOlderFollowers.collect.foreach(println(_))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala %}
> The `aggregateMessages` operation performs optimally when the messages (and the sums of
> messages) are constant sized (e.g., floats and addition instead of lists and concatenation).
@@ -793,29 +771,7 @@ second argument list contains the user defined functions for receiving messages
We can use the Pregel operator to express computation such as single source
shortest path in the following example.
-{% highlight scala %}
-import org.apache.spark.graphx._
-// Import random graph generation library
-import org.apache.spark.graphx.util.GraphGenerators
-// A graph with edge attributes containing distances
-val graph: Graph[Long, Double] =
- GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
-val sourceId: VertexId = 42 // The ultimate source
-// Initialize the graph such that all vertices except the root have distance infinity.
-val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
-val sssp = initialGraph.pregel(Double.PositiveInfinity)(
- (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
- triplet => { // Send Message
- if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
- Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
- } else {
- Iterator.empty
- }
- },
- (a,b) => math.min(a,b) // Merge Message
- )
-println(sssp.vertices.collect.mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/SSSPExample.scala %}
<a name="graph_builders"></a>
@@ -1009,64 +965,19 @@ GraphX comes with static and dynamic implementations of PageRank as methods on t
GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in `data/graphx/users.txt`, and a set of relationships between users is given in `data/graphx/followers.txt`. We compute the PageRank of each user as follows:
-{% highlight scala %}
-// Load the edges as a graph
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-// Run PageRank
-val ranks = graph.pageRank(0.0001).vertices
-// Join the ranks with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
- val fields = line.split(",")
- (fields(0).toLong, fields(1))
-}
-val ranksByUsername = users.join(ranks).map {
- case (id, (username, rank)) => (username, rank)
-}
-// Print the result
-println(ranksByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/PageRankExample.scala %}
## Connected Components
The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents` object][ConnectedComponents], and we compute the connected components of the example social network dataset from the [PageRank section](#pagerank) as follows:
-{% highlight scala %}
-// Load the graph as in the PageRank example
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-// Find the connected components
-val cc = graph.connectedComponents().vertices
-// Join the connected components with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
- val fields = line.split(",")
- (fields(0).toLong, fields(1))
-}
-val ccByUsername = users.join(cc).map {
- case (id, (username, cc)) => (username, cc)
-}
-// Print the result
-println(ccByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala %}
## Triangle Counting
A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
-{% highlight scala %}
-// Load the edges in canonical order and partition the graph for triangle count
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
-// Find the triangle count for each vertex
-val triCounts = graph.triangleCount().vertices
-// Join the triangle counts with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
- val fields = line.split(",")
- (fields(0).toLong, fields(1))
-}
-val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
- (username, tc)
-}
-// Print the result
-println(triCountByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala %}
# Examples
@@ -1076,36 +987,4 @@ to important relationships and users, run page-rank on the sub-graph, and
then finally return attributes associated with the top users. I can do
all of this in just a few lines with GraphX:
-{% highlight scala %}
-// Connect to the Spark cluster
-val sc = new SparkContext("spark://master.amplab.org", "research")
-
-// Load my user data and parse into tuples of user id and attribute list
-val users = (sc.textFile("data/graphx/users.txt")
- .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
-
-// Parse the edge data which is already in userId -> userId format
-val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-
-// Attach the user attributes
-val graph = followerGraph.outerJoinVertices(users) {
- case (uid, deg, Some(attrList)) => attrList
- // Some users may not have attributes so we set them as empty
- case (uid, deg, None) => Array.empty[String]
-}
-
-// Restrict the graph to users with usernames and names
-val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
-
-// Compute the PageRank
-val pagerankGraph = subgraph.pageRank(0.001)
-
-// Get the attributes of the top pagerank users
-val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
- case (uid, attrList, Some(pr)) => (pr, attrList.toList)
- case (uid, attrList, None) => (0.0, attrList.toList)
-}
-
-println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
-
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala %}
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
new file mode 100644
index 0000000000..8f8262db37
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{Graph, VertexRDD}
+import org.apache.spark.graphx.util.GraphGenerators
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An example use the [`aggregateMessages`][Graph.aggregateMessages] operator to
+ * compute the average age of the more senior followers of each user
+ * Run with
+ * {{{
+ * bin/run-example graphx.AggregateMessagesExample
+ * }}}
+ */
+object AggregateMessagesExample {
+
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Create a graph with "age" as the vertex property.
+ // Here we use a random graph for simplicity.
+ val graph: Graph[Double, Int] =
+ GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
+ // Compute the number of older followers and their total age
+ val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
+ triplet => { // Map Function
+ if (triplet.srcAttr > triplet.dstAttr) {
+ // Send message to destination vertex containing counter and age
+ triplet.sendToDst(1, triplet.srcAttr)
+ }
+ },
+ // Add counter and age
+ (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
+ )
+ // Divide total age by number of older followers to get average age of older followers
+ val avgAgeOfOlderFollowers: VertexRDD[Double] =
+ olderFollowers.mapValues( (id, value) =>
+ value match { case (count, totalAge) => totalAge / count } )
+ // Display the results
+ avgAgeOfOlderFollowers.collect.foreach(println(_))
+ // $example off$
+
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala
new file mode 100644
index 0000000000..6598863bd2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Suppose I want to build a graph from some text files, restrict the graph
+ * to important relationships and users, run page-rank on the sub-graph, and
+ * then finally return attributes associated with the top users.
+ * This example do all of this in just a few lines with GraphX.
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.ComprehensiveExample
+ * }}}
+ */
+object ComprehensiveExample {
+
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Load my user data and parse into tuples of user id and attribute list
+ val users = (sc.textFile("data/graphx/users.txt")
+ .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
+
+ // Parse the edge data which is already in userId -> userId format
+ val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+
+ // Attach the user attributes
+ val graph = followerGraph.outerJoinVertices(users) {
+ case (uid, deg, Some(attrList)) => attrList
+ // Some users may not have attributes so we set them as empty
+ case (uid, deg, None) => Array.empty[String]
+ }
+
+ // Restrict the graph to users with usernames and names
+ val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
+
+ // Compute the PageRank
+ val pagerankGraph = subgraph.pageRank(0.001)
+
+ // Get the attributes of the top pagerank users
+ val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
+ case (uid, attrList, Some(pr)) => (pr, attrList.toList)
+ case (uid, attrList, None) => (0.0, attrList.toList)
+ }
+
+ println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
+ // $example off$
+
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
new file mode 100644
index 0000000000..5377ddb359
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A connected components algorithm example.
+ * The connected components algorithm labels each connected component of the graph
+ * with the ID of its lowest-numbered vertex.
+ * For example, in a social network, connected components can approximate clusters.
+ * GraphX contains an implementation of the algorithm in the
+ * [`ConnectedComponents` object][ConnectedComponents],
+ * and we compute the connected components of the example social network dataset.
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.ConnectedComponentsExample
+ * }}}
+ */
+object ConnectedComponentsExample {
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Load the graph as in the PageRank example
+ val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+ // Find the connected components
+ val cc = graph.connectedComponents().vertices
+ // Join the connected components with the usernames
+ val users = sc.textFile("data/graphx/users.txt").map { line =>
+ val fields = line.split(",")
+ (fields(0).toLong, fields(1))
+ }
+ val ccByUsername = users.join(cc).map {
+ case (id, (username, cc)) => (username, cc)
+ }
+ // Print the result
+ println(ccByUsername.collect().mkString("\n"))
+ // $example off$
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
new file mode 100644
index 0000000000..9e9affca07
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A PageRank example on social network dataset
+ * Run with
+ * {{{
+ * bin/run-example graphx.PageRankExample
+ * }}}
+ */
+object PageRankExample {
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Load the edges as a graph
+ val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+ // Run PageRank
+ val ranks = graph.pageRank(0.0001).vertices
+ // Join the ranks with the usernames
+ val users = sc.textFile("data/graphx/users.txt").map { line =>
+ val fields = line.split(",")
+ (fields(0).toLong, fields(1))
+ }
+ val ranksByUsername = users.join(ranks).map {
+ case (id, (username, rank)) => (username, rank)
+ }
+ // Print the result
+ println(ranksByUsername.collect().mkString("\n"))
+ // $example off$
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala
new file mode 100644
index 0000000000..5e8b19671d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{Graph, VertexId}
+import org.apache.spark.graphx.util.GraphGenerators
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An example use the Pregel operator to express computation
+ * such as single source shortest path
+ * Run with
+ * {{{
+ * bin/run-example graphx.SSSPExample
+ * }}}
+ */
+object SSSPExample {
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // A graph with edge attributes containing distances
+ val graph: Graph[Long, Double] =
+ GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
+ val sourceId: VertexId = 42 // The ultimate source
+ // Initialize the graph such that all vertices except the root have distance infinity.
+ val initialGraph = graph.mapVertices((id, _) =>
+ if (id == sourceId) 0.0 else Double.PositiveInfinity)
+ val sssp = initialGraph.pregel(Double.PositiveInfinity)(
+ (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
+ triplet => { // Send Message
+ if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
+ Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
+ } else {
+ Iterator.empty
+ }
+ },
+ (a, b) => math.min(a, b) // Merge Message
+ )
+ println(sssp.vertices.collect.mkString("\n"))
+ // $example off$
+
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
new file mode 100644
index 0000000000..b9bff69086
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A vertex is part of a triangle when it has two adjacent vertices with an edge between them.
+ * GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount]
+ * that determines the number of triangles passing through each vertex,
+ * providing a measure of clustering.
+ * We compute the triangle count of the social network dataset.
+ *
+ * Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`)
+ * and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.TriangleCountingExample
+ * }}}
+ */
+object TriangleCountingExample {
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Load the edges in canonical order and partition the graph for triangle count
+ val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
+ .partitionBy(PartitionStrategy.RandomVertexCut)
+ // Find the triangle count for each vertex
+ val triCounts = graph.triangleCount().vertices
+ // Join the triangle counts with the usernames
+ val users = sc.textFile("data/graphx/users.txt").map { line =>
+ val fields = line.split(",")
+ (fields(0).toLong, fields(1))
+ }
+ val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
+ (username, tc)
+ }
+ // Print the result
+ println(triCountByUsername.collect().mkString("\n"))
+ // $example off$
+ spark.stop()
+ }
+}
+// scalastyle:on println