diff options
author | Dan McClary <dan.mcclary@gmail.com> | 2015-05-01 11:55:43 -0700 |
---|---|---|
committer | Joseph Gonzalez <joseph.e.gonzalez@gmail.com> | 2015-05-01 11:55:43 -0700 |
commit | 7d427222dca4807ec55e8d9a7de6ffe861cd0d24 (patch) | |
tree | 319025199d7a2009c9bb27250baa27d93b60f1c5 /graphx/src/test | |
parent | 27de6fef6a852c0801b5d55d0e69096878bd7909 (diff) | |
download | spark-7d427222dca4807ec55e8d9a7de6ffe861cd0d24.tar.gz spark-7d427222dca4807ec55e8d9a7de6ffe861cd0d24.tar.bz2 spark-7d427222dca4807ec55e8d9a7de6ffe861cd0d24.zip |
[SPARK-5854] personalized page rank
Here's a modification to PageRank which does personalized PageRank. The approach is basically similar to that outlined by Bahmani et al. from 2010 (http://arxiv.org/pdf/1006.2880.pdf).
I'm sure this needs tuning up or other considerations, so let me know how I can improve this.
Author: Dan McClary <dan.mcclary@gmail.com>
Author: dwmclary <dan.mcclary@gmail.com>
Closes #4774 from dwmclary/SPARK-5854-Personalized-PageRank and squashes the following commits:
8b907db [dwmclary] fixed scalastyle errors in PageRankSuite
2c20e5d [dwmclary] merged with upstream master
d6cebac [dwmclary] updated as per style requests
7d00c23 [Dan McClary] fixed line overrun in personalizedVertexPageRank
d711677 [Dan McClary] updated vertexProgram to restore binary compatibility for inner method
bb8d507 [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
fba0edd [Dan McClary] fixed silly mistakes
de51be2 [Dan McClary] cleaned up whitespace between comments and methods
0c30d0c [Dan McClary] updated to maintain binary compatibility
aaf0b4b [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
76773f6 [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
44ada8e [Dan McClary] updated tolerance on chain PPR
1ffed95 [Dan McClary] updated tolerance on chain PPR
b67ac69 [Dan McClary] updated tolerance on chain PPR
a560942 [Dan McClary] rolled PPR into pregel code for PageRank
6dc2c29 [Dan McClary] initial implementation of personalized page rank
Diffstat (limited to 'graphx/src/test')
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 95804b07b1..3f3c9dfd7b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -92,6 +92,36 @@ class PageRankSuite extends FunSuite with LocalSparkContext { } } // end of test Star PageRank + test("Star PersonalPageRank") { + withSpark { sc => + val nVertices = 100 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val errorTol = 1.0e-5 + + val staticRanks1 = starGraph.staticPersonalizedPageRank(0,numIter = 1, resetProb).vertices + val staticRanks2 = starGraph.staticPersonalizedPageRank(0,numIter = 2, resetProb) + .vertices.cache() + + // Static PageRank should only take 2 iterations to converge + val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => + if (pr1 != pr2) 1 else 0 + }.map { case (vid, test) => test }.sum + assert(notMatching === 0) + + val staticErrors = staticRanks2.map { case (vid, pr) => + val correct = (vid > 0 && pr == resetProb) || + (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * + (nVertices - 1)) )) < 1.0E-5) + if (!correct) 1 else 0 + } + assert(staticErrors.sum === 0) + + val dynamicRanks = starGraph.personalizedPageRank(0,0, resetProb).vertices.cache() + assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + } + } // end of test Star PageRank + test("Grid PageRank") { withSpark { sc => val rows = 10 @@ -128,4 +158,21 @@ class PageRankSuite extends FunSuite with LocalSparkContext { assert(compareRanks(staticRanks, dynamicRanks) < errorTol) } } + + test("Chain PersonalizedPageRank") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x + 1) ) + val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val errorTol = 1.0e-1 + + val staticRanks = chain.staticPersonalizedPageRank(4, numIter, resetProb).vertices + val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices + + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + } + } } |