aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorAndrew Ray <ray.andrew@gmail.com>2016-12-15 23:32:10 -0800
committerAnkur Dave <ankurdave@gmail.com>2016-12-15 23:32:10 -0800
commit78062b8521bb02900baeec31992d697fa677f122 (patch)
tree74f04bb87417c6e10280fd839295295ddf1f0f90 /graphx/src/test
parent172a52f5d31337d90155feb7072381e8d5712288 (diff)
downloadspark-78062b8521bb02900baeec31992d697fa677f122.tar.gz
spark-78062b8521bb02900baeec31992d697fa677f122.tar.bz2
spark-78062b8521bb02900baeec31992d697fa677f122.zip
[SPARK-18845][GRAPHX] PageRank has incorrect initialization value that leads to slow convergence
## What changes were proposed in this pull request? Change the initial value in all PageRank implementations to be `1.0` instead of `resetProb` (default `0.15`) and use `outerJoinVertices` instead of `joinVertices` so that source vertices get updated in each iteration. This seems to have been introduced a long time ago in https://github.com/apache/spark/commit/15a564598fe63003652b1e24527c432080b5976c#diff-b2bf3f97dcd2f19d61c921836159cda9L90 With the exception of graphs with sinks (which currently give incorrect results see SPARK-18847) this gives faster convergence as the sum of ranks is already correct (sum of ranks should be number of vertices). Convergence comparision benchmark for small graph: http://imgur.com/a/HkkZf Code for benchmark: https://gist.github.com/aray/a7de1f3801a810f8b1fa00c271a1fefd ## How was this patch tested? (corrected) existing unit tests and additional test that verifies against result of igraph and NetworkX on a loop with a source. Author: Andrew Ray <ray.andrew@gmail.com> Closes #16271 from aray/pagerank-initial-value.
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala34
1 files changed, 30 insertions, 4 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index b6305c8d00..6afbb5a959 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -41,7 +41,7 @@ object GridPageRank {
}
}
// compute the pagerank
- var pr = Array.fill(nRows * nCols)(resetProb)
+ var pr = Array.fill(nRows * nCols)(1.0)
for (iter <- 0 until nIter) {
val oldPr = pr
pr = new Array[Double](nRows * nCols)
@@ -70,10 +70,10 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val resetProb = 0.15
val errorTol = 1.0e-5
- val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices
- val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache()
+ val staticRanks1 = starGraph.staticPageRank(numIter = 2, resetProb).vertices
+ val staticRanks2 = starGraph.staticPageRank(numIter = 3, resetProb).vertices.cache()
- // Static PageRank should only take 2 iterations to converge
+ // Static PageRank should only take 3 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum()
@@ -203,4 +203,30 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
}
}
+
+ test("Loop with source PageRank") {
+ withSpark { sc =>
+ val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 4L) :: (4L, 2L) :: Nil)
+ val g = Graph.fromEdgeTuples(edges, 1)
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 50
+ val errorTol = 1.0e-5
+
+ val staticRanks = g.staticPageRank(numIter, resetProb).vertices
+ val dynamicRanks = g.pageRank(tol, resetProb).vertices
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+
+ // Computed in igraph 1.0 w/ R bindings:
+ // > page_rank(graph_from_literal( A -+ B -+ C -+ D -+ B))
+ // Alternatively in NetworkX 1.11:
+ // > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,4),(4,2)]))
+ // We multiply by the number of vertices to account for difference in normalization
+ val igraphPR = Seq(0.0375000, 0.3326045, 0.3202138, 0.3096817).map(_ * 4)
+ val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR))
+ assert(compareRanks(staticRanks, ranks) < errorTol)
+ assert(compareRanks(dynamicRanks, ranks) < errorTol)
+
+ }
+ }
}