aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorYves Raimond <yraimond@netflix.com>2015-11-02 20:35:59 -0800
committerDB Tsai <dbt@netflix.com>2015-11-02 20:35:59 -0800
commitefaa4721b511a1d29229facde6457a6dcda18966 (patch)
treef372bed44292f954b0a2bc2f47053abd56b2dc4d /graphx
parent9cb5c731dadff9539126362827a258d6b65754bb (diff)
downloadspark-efaa4721b511a1d29229facde6457a6dcda18966.tar.gz
spark-efaa4721b511a1d29229facde6457a6dcda18966.tar.bz2
spark-efaa4721b511a1d29229facde6457a6dcda18966.zip
[SPARK-11432][GRAPHX] Personalized PageRank shouldn't use uniform initialization
Changes the personalized pagerank initialization to be non-uniform. Author: Yves Raimond <yraimond@netflix.com> Closes #9386 from moustaki/personalized-pagerank-init.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala29
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala13
2 files changed, 27 insertions, 15 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 8c0a461e99..52b237fc15 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -104,18 +104,23 @@ object PageRank extends Logging {
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
+ val personalized = srcId isDefined
+ val src: VertexId = srcId.getOrElse(-1L)
+
// Initialize the PageRank graph with each edge attribute having
- // weight 1/outDegree and each vertex with attribute 1.0.
+ // weight 1/outDegree and each vertex with attribute resetProb.
+ // When running personalized pagerank, only the source vertex
+ // has an attribute resetProb. All others are set to 0.
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
- .mapVertices( (id, attr) => resetProb )
+ .mapVertices { (id, attr) =>
+ if (!(id != src && personalized)) resetProb else 0.0
+ }
- val personalized = srcId isDefined
- val src: VertexId = srcId.getOrElse(-1L)
def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
var iteration = 0
@@ -192,6 +197,9 @@ object PageRank extends Logging {
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
+ val personalized = srcId.isDefined
+ val src: VertexId = srcId.getOrElse(-1L)
+
// Initialize the pagerankGraph with each edge attribute
// having weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[(Double, Double), Double] = graph
@@ -202,13 +210,11 @@ object PageRank extends Logging {
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
- .mapVertices( (id, attr) => (0.0, 0.0) )
+ .mapVertices { (id, attr) =>
+ if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
+ }
.cache()
- val personalized = srcId.isDefined
- val src: VertexId = srcId.getOrElse(-1L)
-
-
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
@@ -225,7 +231,8 @@ object PageRank extends Logging {
teleport = oldPR*delta
val newPR = teleport + (1.0 - resetProb) * msgSum
- (newPR, newPR - oldPR)
+ val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
+ (newPR, newDelta)
}
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
@@ -239,7 +246,7 @@ object PageRank extends Logging {
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
- val initialMessage = resetProb / (1.0 - resetProb)
+ val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
val vp = if (personalized) {
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index 45f1e30110..bdff31446f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -109,17 +109,22 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(notMatching === 0)
val staticErrors = staticRanks2.map { case (vid, pr) =>
- val correct = (vid > 0 && pr == resetProb) ||
- (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb *
- (nVertices - 1)) )) < 1.0E-5)
+ val correct = (vid > 0 && pr == 0.0) ||
+ (vid == 0 && pr == resetProb)
if (!correct) 1 else 0
}
assert(staticErrors.sum === 0)
val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+
+ // We have one outbound edge from 1 to 0
+ val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
+ .vertices.cache()
+ val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
+ assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
}
- } // end of test Star PageRank
+ } // end of test Star PersonalPageRank
test("Grid PageRank") {
withSpark { sc =>