aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-07-02 16:29:00 +0100
committerSean Owen <sowen@cloudera.com>2016-07-02 16:29:00 +0100
commit0bd7cd18bc4d535b0c4499913f6747b3f6315ac2 (patch)
tree6c966df99d61193b0c5a34af9f0185d942143721 /examples
parent192d1f9cf3463d050b87422939448f2acf86acc9 (diff)
downloadspark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.tar.gz
spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.tar.bz2
spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.zip
[SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets from source files instead of hard code them
## What changes were proposed in this pull request? I extract 6 example programs from GraphX programming guide and replace them with `include_example` label. The 6 example programs are: - AggregateMessagesExample.scala - SSSPExample.scala - TriangleCountingExample.scala - ConnectedComponentsExample.scala - ComprehensiveExample.scala - PageRankExample.scala All the example code can run using `bin/run-example graphx.EXAMPLE_NAME` ## How was this patch tested? Manual. Author: WeichenXu <WeichenXu123@outlook.com> Closes #14015 from WeichenXu123/graphx_example_plugin.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala72
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala80
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala68
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala61
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala69
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala70
6 files changed, 420 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
new file mode 100644
index 0000000000..8f8262db37
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{Graph, VertexRDD}
+import org.apache.spark.graphx.util.GraphGenerators
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An example use the [`aggregateMessages`][Graph.aggregateMessages] operator to
+ * compute the average age of the more senior followers of each user
+ * Run with
+ * {{{
+ * bin/run-example graphx.AggregateMessagesExample
+ * }}}
+ */
+object AggregateMessagesExample {
+
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Create a graph with "age" as the vertex property.
+ // Here we use a random graph for simplicity.
+ val graph: Graph[Double, Int] =
+ GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
+ // Compute the number of older followers and their total age
+ val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
+ triplet => { // Map Function
+ if (triplet.srcAttr > triplet.dstAttr) {
+ // Send message to destination vertex containing counter and age
+ triplet.sendToDst(1, triplet.srcAttr)
+ }
+ },
+ // Add counter and age
+ (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
+ )
+ // Divide total age by number of older followers to get average age of older followers
+ val avgAgeOfOlderFollowers: VertexRDD[Double] =
+ olderFollowers.mapValues( (id, value) =>
+ value match { case (count, totalAge) => totalAge / count } )
+ // Display the results
+ avgAgeOfOlderFollowers.collect.foreach(println(_))
+ // $example off$
+
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala
new file mode 100644
index 0000000000..6598863bd2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Suppose I want to build a graph from some text files, restrict the graph
+ * to important relationships and users, run page-rank on the sub-graph, and
+ * then finally return attributes associated with the top users.
+ * This example do all of this in just a few lines with GraphX.
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.ComprehensiveExample
+ * }}}
+ */
+object ComprehensiveExample {
+
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Load my user data and parse into tuples of user id and attribute list
+ val users = (sc.textFile("data/graphx/users.txt")
+ .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
+
+ // Parse the edge data which is already in userId -> userId format
+ val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+
+ // Attach the user attributes
+ val graph = followerGraph.outerJoinVertices(users) {
+ case (uid, deg, Some(attrList)) => attrList
+ // Some users may not have attributes so we set them as empty
+ case (uid, deg, None) => Array.empty[String]
+ }
+
+ // Restrict the graph to users with usernames and names
+ val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
+
+ // Compute the PageRank
+ val pagerankGraph = subgraph.pageRank(0.001)
+
+ // Get the attributes of the top pagerank users
+ val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
+ case (uid, attrList, Some(pr)) => (pr, attrList.toList)
+ case (uid, attrList, None) => (0.0, attrList.toList)
+ }
+
+ println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
+ // $example off$
+
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
new file mode 100644
index 0000000000..5377ddb359
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A connected components algorithm example.
+ * The connected components algorithm labels each connected component of the graph
+ * with the ID of its lowest-numbered vertex.
+ * For example, in a social network, connected components can approximate clusters.
+ * GraphX contains an implementation of the algorithm in the
+ * [`ConnectedComponents` object][ConnectedComponents],
+ * and we compute the connected components of the example social network dataset.
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.ConnectedComponentsExample
+ * }}}
+ */
+object ConnectedComponentsExample {
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Load the graph as in the PageRank example
+ val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+ // Find the connected components
+ val cc = graph.connectedComponents().vertices
+ // Join the connected components with the usernames
+ val users = sc.textFile("data/graphx/users.txt").map { line =>
+ val fields = line.split(",")
+ (fields(0).toLong, fields(1))
+ }
+ val ccByUsername = users.join(cc).map {
+ case (id, (username, cc)) => (username, cc)
+ }
+ // Print the result
+ println(ccByUsername.collect().mkString("\n"))
+ // $example off$
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
new file mode 100644
index 0000000000..9e9affca07
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A PageRank example on social network dataset
+ * Run with
+ * {{{
+ * bin/run-example graphx.PageRankExample
+ * }}}
+ */
+object PageRankExample {
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Load the edges as a graph
+ val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+ // Run PageRank
+ val ranks = graph.pageRank(0.0001).vertices
+ // Join the ranks with the usernames
+ val users = sc.textFile("data/graphx/users.txt").map { line =>
+ val fields = line.split(",")
+ (fields(0).toLong, fields(1))
+ }
+ val ranksByUsername = users.join(ranks).map {
+ case (id, (username, rank)) => (username, rank)
+ }
+ // Print the result
+ println(ranksByUsername.collect().mkString("\n"))
+ // $example off$
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala
new file mode 100644
index 0000000000..5e8b19671d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{Graph, VertexId}
+import org.apache.spark.graphx.util.GraphGenerators
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An example use the Pregel operator to express computation
+ * such as single source shortest path
+ * Run with
+ * {{{
+ * bin/run-example graphx.SSSPExample
+ * }}}
+ */
+object SSSPExample {
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // A graph with edge attributes containing distances
+ val graph: Graph[Long, Double] =
+ GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
+ val sourceId: VertexId = 42 // The ultimate source
+ // Initialize the graph such that all vertices except the root have distance infinity.
+ val initialGraph = graph.mapVertices((id, _) =>
+ if (id == sourceId) 0.0 else Double.PositiveInfinity)
+ val sssp = initialGraph.pregel(Double.PositiveInfinity)(
+ (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
+ triplet => { // Send Message
+ if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
+ Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
+ } else {
+ Iterator.empty
+ }
+ },
+ (a, b) => math.min(a, b) // Merge Message
+ )
+ println(sssp.vertices.collect.mkString("\n"))
+ // $example off$
+
+ spark.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
new file mode 100644
index 0000000000..b9bff69086
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A vertex is part of a triangle when it has two adjacent vertices with an edge between them.
+ * GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount]
+ * that determines the number of triangles passing through each vertex,
+ * providing a measure of clustering.
+ * We compute the triangle count of the social network dataset.
+ *
+ * Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`)
+ * and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.TriangleCountingExample
+ * }}}
+ */
+object TriangleCountingExample {
+ def main(args: Array[String]): Unit = {
+ // Creates a SparkSession.
+ val spark = SparkSession
+ .builder
+ .appName(s"${this.getClass.getSimpleName}")
+ .getOrCreate()
+ val sc = spark.sparkContext
+
+ // $example on$
+ // Load the edges in canonical order and partition the graph for triangle count
+ val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
+ .partitionBy(PartitionStrategy.RandomVertexCut)
+ // Find the triangle count for each vertex
+ val triCounts = graph.triangleCount().vertices
+ // Join the triangle counts with the usernames
+ val users = sc.textFile("data/graphx/users.txt").map { line =>
+ val fields = line.split(",")
+ (fields(0).toLong, fields(1))
+ }
+ val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
+ (username, tc)
+ }
+ // Print the result
+ println(triCountByUsername.collect().mkString("\n"))
+ // $example off$
+ spark.stop()
+ }
+}
+// scalastyle:on println