aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-09-12 14:08:38 -0700
committerReynold Xin <rxin@apache.org>2014-09-12 14:08:38 -0700
commit15a564598fe63003652b1e24527c432080b5976c (patch)
treef22e278781547cd00715814b556b7143ec5a8d55
parenteae81b0bfdf3159be90f507a03853800aec1874a (diff)
downloadspark-15a564598fe63003652b1e24527c432080b5976c.tar.gz
spark-15a564598fe63003652b1e24527c432080b5976c.tar.bz2
spark-15a564598fe63003652b1e24527c432080b5976c.zip
[SPARK-3427] [GraphX] Avoid active vertex tracking in static PageRank
GraphX's current implementation of static (fixed iteration count) PageRank uses the Pregel API. This unnecessarily tracks active vertices, even though in static PageRank all vertices are always active. Active vertex tracking incurs the following costs: 1. A shuffle per iteration to ship the active sets to the edge partitions. 2. A hash table creation per iteration at each partition to index the active sets for lookup. 3. A hash lookup per edge to check whether the source vertex is active. I reimplemented static PageRank using the lower-level GraphX API instead of the Pregel API. In benchmarks on a 16-node m2.4xlarge cluster, this provided a 23% speedup (from 514 s to 397 s, mean over 3 trials) for 10 iterations of PageRank on a synthetic graph with 10M vertices and 1.27B edges. Author: Ankur Dave <ankurdave@gmail.com> Closes #2308 from ankurdave/SPARK-3427 and squashes the following commits: 449996a [Ankur Dave] Avoid unnecessary active vertex tracking in static PageRank
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala45
1 files changed, 29 insertions, 16 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 614555a054..257e2f3a36 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -79,30 +79,43 @@ object PageRank extends Logging {
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
{
- // Initialize the pagerankGraph with each edge attribute having
+ // Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute 1.0.
- val pagerankGraph: Graph[Double, Double] = graph
+ var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to the initial pagerank values
- .mapVertices( (id, attr) => 1.0 )
- .cache()
+ .mapVertices( (id, attr) => resetProb )
- // Define the three functions needed to implement PageRank in the GraphX
- // version of Pregel
- def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
- resetProb + (1.0 - resetProb) * msgSum
- def sendMessage(edge: EdgeTriplet[Double, Double]) =
- Iterator((edge.dstId, edge.srcAttr * edge.attr))
- def messageCombiner(a: Double, b: Double): Double = a + b
- // The initial message received by all vertices in PageRank
- val initialMessage = 0.0
+ var iteration = 0
+ var prevRankGraph: Graph[Double, Double] = null
+ while (iteration < numIter) {
+ rankGraph.cache()
- // Execute pregel for a fixed number of iterations.
- Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out)(
- vertexProgram, sendMessage, messageCombiner)
+ // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
+ // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
+ val rankUpdates = rankGraph.mapReduceTriplets[Double](
+ e => Iterator((e.dstId, e.srcAttr * e.attr)), _ + _)
+
+ // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
+ // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
+ // edge partitions.
+ prevRankGraph = rankGraph
+ rankGraph = rankGraph.joinVertices(rankUpdates) {
+ (id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
+ }.cache()
+
+ rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
+ logInfo(s"PageRank finished iteration $iteration.")
+ prevRankGraph.vertices.unpersist(false)
+ prevRankGraph.edges.unpersist(false)
+
+ iteration += 1
+ }
+
+ rankGraph
}
/**