diff options
author | WeichenXu <WeichenXu123@outlook.com> | 2016-07-02 16:29:00 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-07-02 16:29:00 +0100 |
commit | 0bd7cd18bc4d535b0c4499913f6747b3f6315ac2 (patch) | |
tree | 6c966df99d61193b0c5a34af9f0185d942143721 /examples/src/main/scala | |
parent | 192d1f9cf3463d050b87422939448f2acf86acc9 (diff) | |
download | spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.tar.gz spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.tar.bz2 spark-0bd7cd18bc4d535b0c4499913f6747b3f6315ac2.zip |
[SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets from source files instead of hard code them
## What changes were proposed in this pull request?
I extract 6 example programs from GraphX programming guide and replace them with
`include_example` label.
The 6 example programs are:
- AggregateMessagesExample.scala
- SSSPExample.scala
- TriangleCountingExample.scala
- ConnectedComponentsExample.scala
- ComprehensiveExample.scala
- PageRankExample.scala
All the example code can run using
`bin/run-example graphx.EXAMPLE_NAME`
## How was this patch tested?
Manual.
Author: WeichenXu <WeichenXu123@outlook.com>
Closes #14015 from WeichenXu123/graphx_example_plugin.
Diffstat (limited to 'examples/src/main/scala')
6 files changed, 420 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala new file mode 100644 index 0000000000..8f8262db37 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.graphx + +// $example on$ +import org.apache.spark.graphx.{Graph, VertexRDD} +import org.apache.spark.graphx.util.GraphGenerators +// $example off$ +import org.apache.spark.sql.SparkSession + +/** + * An example use the [`aggregateMessages`][Graph.aggregateMessages] operator to + * compute the average age of the more senior followers of each user + * Run with + * {{{ + * bin/run-example graphx.AggregateMessagesExample + * }}} + */ +object AggregateMessagesExample { + + def main(args: Array[String]): Unit = { + // Creates a SparkSession. + val spark = SparkSession + .builder + .appName(s"${this.getClass.getSimpleName}") + .getOrCreate() + val sc = spark.sparkContext + + // $example on$ + // Create a graph with "age" as the vertex property. + // Here we use a random graph for simplicity. + val graph: Graph[Double, Int] = + GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble ) + // Compute the number of older followers and their total age + val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)]( + triplet => { // Map Function + if (triplet.srcAttr > triplet.dstAttr) { + // Send message to destination vertex containing counter and age + triplet.sendToDst(1, triplet.srcAttr) + } + }, + // Add counter and age + (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function + ) + // Divide total age by number of older followers to get average age of older followers + val avgAgeOfOlderFollowers: VertexRDD[Double] = + olderFollowers.mapValues( (id, value) => + value match { case (count, totalAge) => totalAge / count } ) + // Display the results + avgAgeOfOlderFollowers.collect.foreach(println(_)) + // $example off$ + + spark.stop() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala new file mode 100644 index 0000000000..6598863bd2 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.graphx + +// $example on$ +import org.apache.spark.graphx.GraphLoader +// $example off$ +import org.apache.spark.sql.SparkSession + +/** + * Suppose I want to build a graph from some text files, restrict the graph + * to important relationships and users, run page-rank on the sub-graph, and + * then finally return attributes associated with the top users. + * This example do all of this in just a few lines with GraphX. + * + * Run with + * {{{ + * bin/run-example graphx.ComprehensiveExample + * }}} + */ +object ComprehensiveExample { + + def main(args: Array[String]): Unit = { + // Creates a SparkSession. + val spark = SparkSession + .builder + .appName(s"${this.getClass.getSimpleName}") + .getOrCreate() + val sc = spark.sparkContext + + // $example on$ + // Load my user data and parse into tuples of user id and attribute list + val users = (sc.textFile("data/graphx/users.txt") + .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) )) + + // Parse the edge data which is already in userId -> userId format + val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") + + // Attach the user attributes + val graph = followerGraph.outerJoinVertices(users) { + case (uid, deg, Some(attrList)) => attrList + // Some users may not have attributes so we set them as empty + case (uid, deg, None) => Array.empty[String] + } + + // Restrict the graph to users with usernames and names + val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2) + + // Compute the PageRank + val pagerankGraph = subgraph.pageRank(0.001) + + // Get the attributes of the top pagerank users + val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) { + case (uid, attrList, Some(pr)) => (pr, attrList.toList) + case (uid, attrList, None) => (0.0, attrList.toList) + } + + println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) + // $example off$ + + spark.stop() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala new file mode 100644 index 0000000000..5377ddb359 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.graphx + +// $example on$ +import org.apache.spark.graphx.GraphLoader +// $example off$ +import org.apache.spark.sql.SparkSession + +/** + * A connected components algorithm example. + * The connected components algorithm labels each connected component of the graph + * with the ID of its lowest-numbered vertex. + * For example, in a social network, connected components can approximate clusters. + * GraphX contains an implementation of the algorithm in the + * [`ConnectedComponents` object][ConnectedComponents], + * and we compute the connected components of the example social network dataset. + * + * Run with + * {{{ + * bin/run-example graphx.ConnectedComponentsExample + * }}} + */ +object ConnectedComponentsExample { + def main(args: Array[String]): Unit = { + // Creates a SparkSession. + val spark = SparkSession + .builder + .appName(s"${this.getClass.getSimpleName}") + .getOrCreate() + val sc = spark.sparkContext + + // $example on$ + // Load the graph as in the PageRank example + val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") + // Find the connected components + val cc = graph.connectedComponents().vertices + // Join the connected components with the usernames + val users = sc.textFile("data/graphx/users.txt").map { line => + val fields = line.split(",") + (fields(0).toLong, fields(1)) + } + val ccByUsername = users.join(cc).map { + case (id, (username, cc)) => (username, cc) + } + // Print the result + println(ccByUsername.collect().mkString("\n")) + // $example off$ + spark.stop() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala new file mode 100644 index 0000000000..9e9affca07 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.graphx + +// $example on$ +import org.apache.spark.graphx.GraphLoader +// $example off$ +import org.apache.spark.sql.SparkSession + +/** + * A PageRank example on social network dataset + * Run with + * {{{ + * bin/run-example graphx.PageRankExample + * }}} + */ +object PageRankExample { + def main(args: Array[String]): Unit = { + // Creates a SparkSession. + val spark = SparkSession + .builder + .appName(s"${this.getClass.getSimpleName}") + .getOrCreate() + val sc = spark.sparkContext + + // $example on$ + // Load the edges as a graph + val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") + // Run PageRank + val ranks = graph.pageRank(0.0001).vertices + // Join the ranks with the usernames + val users = sc.textFile("data/graphx/users.txt").map { line => + val fields = line.split(",") + (fields(0).toLong, fields(1)) + } + val ranksByUsername = users.join(ranks).map { + case (id, (username, rank)) => (username, rank) + } + // Print the result + println(ranksByUsername.collect().mkString("\n")) + // $example off$ + spark.stop() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala new file mode 100644 index 0000000000..5e8b19671d --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.graphx + +// $example on$ +import org.apache.spark.graphx.{Graph, VertexId} +import org.apache.spark.graphx.util.GraphGenerators +// $example off$ +import org.apache.spark.sql.SparkSession + +/** + * An example use the Pregel operator to express computation + * such as single source shortest path + * Run with + * {{{ + * bin/run-example graphx.SSSPExample + * }}} + */ +object SSSPExample { + def main(args: Array[String]): Unit = { + // Creates a SparkSession. + val spark = SparkSession + .builder + .appName(s"${this.getClass.getSimpleName}") + .getOrCreate() + val sc = spark.sparkContext + + // $example on$ + // A graph with edge attributes containing distances + val graph: Graph[Long, Double] = + GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble) + val sourceId: VertexId = 42 // The ultimate source + // Initialize the graph such that all vertices except the root have distance infinity. + val initialGraph = graph.mapVertices((id, _) => + if (id == sourceId) 0.0 else Double.PositiveInfinity) + val sssp = initialGraph.pregel(Double.PositiveInfinity)( + (id, dist, newDist) => math.min(dist, newDist), // Vertex Program + triplet => { // Send Message + if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { + Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) + } else { + Iterator.empty + } + }, + (a, b) => math.min(a, b) // Merge Message + ) + println(sssp.vertices.collect.mkString("\n")) + // $example off$ + + spark.stop() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala new file mode 100644 index 0000000000..b9bff69086 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.graphx + +// $example on$ +import org.apache.spark.graphx.{GraphLoader, PartitionStrategy} +// $example off$ +import org.apache.spark.sql.SparkSession + +/** + * A vertex is part of a triangle when it has two adjacent vertices with an edge between them. + * GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] + * that determines the number of triangles passing through each vertex, + * providing a measure of clustering. + * We compute the triangle count of the social network dataset. + * + * Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) + * and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy]. + * + * Run with + * {{{ + * bin/run-example graphx.TriangleCountingExample + * }}} + */ +object TriangleCountingExample { + def main(args: Array[String]): Unit = { + // Creates a SparkSession. + val spark = SparkSession + .builder + .appName(s"${this.getClass.getSimpleName}") + .getOrCreate() + val sc = spark.sparkContext + + // $example on$ + // Load the edges in canonical order and partition the graph for triangle count + val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true) + .partitionBy(PartitionStrategy.RandomVertexCut) + // Find the triangle count for each vertex + val triCounts = graph.triangleCount().vertices + // Join the triangle counts with the usernames + val users = sc.textFile("data/graphx/users.txt").map { line => + val fields = line.split(",") + (fields(0).toLong, fields(1)) + } + val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) => + (username, tc) + } + // Print the result + println(triCountByUsername.collect().mkString("\n")) + // $example off$ + spark.stop() + } +} +// scalastyle:on println |