diff options
author | Yves Raimond <yraimond@netflix.com> | 2015-11-02 20:35:59 -0800 |
---|---|---|
committer | DB Tsai <dbt@netflix.com> | 2015-11-02 20:35:59 -0800 |
commit | efaa4721b511a1d29229facde6457a6dcda18966 (patch) | |
tree | f372bed44292f954b0a2bc2f47053abd56b2dc4d /graphx/src/main/scala | |
parent | 9cb5c731dadff9539126362827a258d6b65754bb (diff) | |
download | spark-efaa4721b511a1d29229facde6457a6dcda18966.tar.gz spark-efaa4721b511a1d29229facde6457a6dcda18966.tar.bz2 spark-efaa4721b511a1d29229facde6457a6dcda18966.zip |
[SPARK-11432][GRAPHX] Personalized PageRank shouldn't use uniform initialization
Changes the personalized pagerank initialization to be non-uniform.
Author: Yves Raimond <yraimond@netflix.com>
Closes #9386 from moustaki/personalized-pagerank-init.
Diffstat (limited to 'graphx/src/main/scala')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 8c0a461e99..52b237fc15 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -104,18 +104,23 @@ object PageRank extends Logging { graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, srcId: Option[VertexId] = None): Graph[Double, Double] = { + val personalized = srcId isDefined + val src: VertexId = srcId.getOrElse(-1L) + // Initialize the PageRank graph with each edge attribute having - // weight 1/outDegree and each vertex with attribute 1.0. + // weight 1/outDegree and each vertex with attribute resetProb. + // When running personalized pagerank, only the source vertex + // has an attribute resetProb. All others are set to 0. var rankGraph: Graph[Double, Double] = graph // Associate the degree with each vertex .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } // Set the weight on the edges based on the degree .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) // Set the vertex attributes to the initial pagerank values - .mapVertices( (id, attr) => resetProb ) + .mapVertices { (id, attr) => + if (!(id != src && personalized)) resetProb else 0.0 + } - val personalized = srcId isDefined - val src: VertexId = srcId.getOrElse(-1L) def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 } var iteration = 0 @@ -192,6 +197,9 @@ object PageRank extends Logging { graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15, srcId: Option[VertexId] = None): Graph[Double, Double] = { + val personalized = srcId.isDefined + val src: VertexId = srcId.getOrElse(-1L) + // Initialize the pagerankGraph with each edge attribute // having weight 1/outDegree and each vertex with attribute 1.0. val pagerankGraph: Graph[(Double, Double), Double] = graph @@ -202,13 +210,11 @@ object PageRank extends Logging { // Set the weight on the edges based on the degree .mapTriplets( e => 1.0 / e.srcAttr ) // Set the vertex attributes to (initalPR, delta = 0) - .mapVertices( (id, attr) => (0.0, 0.0) ) + .mapVertices { (id, attr) => + if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0) + } .cache() - val personalized = srcId.isDefined - val src: VertexId = srcId.getOrElse(-1L) - - // Define the three functions needed to implement PageRank in the GraphX // version of Pregel def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { @@ -225,7 +231,8 @@ object PageRank extends Logging { teleport = oldPR*delta val newPR = teleport + (1.0 - resetProb) * msgSum - (newPR, newPR - oldPR) + val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR + (newPR, newDelta) } def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { @@ -239,7 +246,7 @@ object PageRank extends Logging { def messageCombiner(a: Double, b: Double): Double = a + b // The initial message received by all vertices in PageRank - val initialMessage = resetProb / (1.0 - resetProb) + val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb) // Execute a dynamic version of Pregel. val vp = if (personalized) { |