aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Ray <ray.andrew@gmail.com>2017-03-17 14:23:07 -0700
committerReynold Xin <rxin@databricks.com>2017-03-17 14:23:07 -0700
commitbfdeea5c68f963ce60d48d0aa4a4c8c582169950 (patch)
treef9ec401b96edd0cfc5f7ad36d3c65b57ea7aeabe
parent376d782164437573880f0ad58cecae1cb5f212f2 (diff)
downloadspark-bfdeea5c68f963ce60d48d0aa4a4c8c582169950.tar.gz
spark-bfdeea5c68f963ce60d48d0aa4a4c8c582169950.tar.bz2
spark-bfdeea5c68f963ce60d48d0aa4a4c8c582169950.zip
[SPARK-18847][GRAPHX] PageRank gives incorrect results for graphs with sinks
## What changes were proposed in this pull request? Graphs with sinks (vertices with no outgoing edges) don't have the expected rank sum of n (or 1 for personalized). We fix this by normalizing to the expected sum at the end of each implementation. Additionally this fixes the dynamic version of personal pagerank which gave incorrect answers that were not detected by existing unit tests. ## How was this patch tested? Revamped existing and additional unit tests with reference values (and reproduction code) from igraph and NetworkX. Note that for comparison on personal pagerank we use the arpack algorithm in igraph as prpack (the current default) redistributes rank to all vertices uniformly instead of just to the personalization source. We could take the alternate convention (redistribute rank to all vertices uniformly) but that would involve more extensive changes to the algorithms (the dynamic version would no longer be able to use Pregel). Author: Andrew Ray <ray.andrew@gmail.com> Closes #16483 from aray/pagerank-sink2.
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala45
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala158
2 files changed, 144 insertions, 59 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 37b6e45359..13b2b57719 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -162,7 +162,8 @@ object PageRank extends Logging {
iteration += 1
}
- rankGraph
+ // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
+ normalizeRankSum(rankGraph, personalized)
}
/**
@@ -179,7 +180,8 @@ object PageRank extends Logging {
* @param resetProb The random reset probability
* @param sources The list of sources to compute personalized pagerank from
* @return the graph with vertex attributes
- * containing the pagerank relative to all starting nodes (as a sparse vector) and
+ * containing the pagerank relative to all starting nodes (as a sparse vector
+ * indexed by the position of nodes in the sources list) and
* edge attributes the normalized edge weight
*/
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
@@ -194,6 +196,8 @@ object PageRank extends Logging {
// TODO if one sources vertex id is outside of the int range
// we won't be able to store its activations in a sparse vector
+ require(sources.max <= Int.MaxValue.toLong,
+ s"This implementation currently only works for source vertex ids at most ${Int.MaxValue}")
val zero = Vectors.sparse(sources.size, List()).asBreeze
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
@@ -245,8 +249,10 @@ object PageRank extends Logging {
i += 1
}
+ // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
+ val rankSums = rankGraph.vertices.values.fold(zero)(_ :+ _)
rankGraph.mapVertices { (vid, attr) =>
- Vectors.fromBreeze(attr)
+ Vectors.fromBreeze(attr :/ rankSums)
}
}
@@ -307,7 +313,7 @@ object PageRank extends Logging {
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initialPR, delta = 0)
.mapVertices { (id, attr) =>
- if (id == src) (1.0, Double.NegativeInfinity) else (0.0, 0.0)
+ if (id == src) (0.0, Double.NegativeInfinity) else (0.0, 0.0)
}
.cache()
@@ -322,13 +328,12 @@ object PageRank extends Logging {
def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
- var teleport = oldPR
- val delta = if (src==id) resetProb else 0.0
- teleport = oldPR*delta
-
- val newPR = teleport + (1.0 - resetProb) * msgSum
- val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
- (newPR, newDelta)
+ val newPR = if (lastDelta == Double.NegativeInfinity) {
+ 1.0
+ } else {
+ oldPR + (1.0 - resetProb) * msgSum
+ }
+ (newPR, newPR - oldPR)
}
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
@@ -353,9 +358,23 @@ object PageRank extends Logging {
vertexProgram(id, attr, msgSum)
}
- Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
+ val rankGraph = Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
vp, sendMessage, messageCombiner)
.mapVertices((vid, attr) => attr._1)
- } // end of deltaPageRank
+ // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
+ normalizeRankSum(rankGraph, personalized)
+ }
+
+ // Normalizes the sum of ranks to n (or 1 if personalized)
+ private def normalizeRankSum(rankGraph: Graph[Double, Double], personalized: Boolean) = {
+ val rankSum = rankGraph.vertices.values.sum()
+ if (personalized) {
+ rankGraph.mapVertices((id, rank) => rank / rankSum)
+ } else {
+ val numVertices = rankGraph.numVertices
+ val correctionFactor = numVertices.toDouble / rankSum
+ rankGraph.mapVertices((id, rank) => rank * correctionFactor)
+ }
+ }
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index 6afbb5a959..9779553ce8 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -50,7 +50,8 @@ object GridPageRank {
inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
}
}
- (0L until (nRows * nCols)).zip(pr)
+ val prSum = pr.sum
+ (0L until (nRows * nCols)).zip(pr.map(_ * pr.length / prSum))
}
}
@@ -68,26 +69,34 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 2
val errorTol = 1.0e-5
- val staticRanks1 = starGraph.staticPageRank(numIter = 2, resetProb).vertices
- val staticRanks2 = starGraph.staticPageRank(numIter = 3, resetProb).vertices.cache()
+ val staticRanks = starGraph.staticPageRank(numIter, resetProb).vertices.cache()
+ val staticRanks2 = starGraph.staticPageRank(numIter + 1, resetProb).vertices
- // Static PageRank should only take 3 iterations to converge
- val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
+ // Static PageRank should only take 2 iterations to converge
+ val notMatching = staticRanks.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum()
assert(notMatching === 0)
- val staticErrors = staticRanks2.map { case (vid, pr) =>
- val p = math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) ))
- val correct = (vid > 0 && pr == resetProb) || (vid == 0L && p < 1.0E-5)
- if (!correct) 1 else 0
- }
- assert(staticErrors.sum === 0)
+ val dynamicRanks = starGraph.pageRank(tol, resetProb).vertices.cache()
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+
+ // Computed in igraph 1.0 w/ R bindings:
+ // > page_rank(make_star(100, mode = "in"))
+ // Alternatively in NetworkX 1.11:
+ // > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)]))
+ // We multiply by the number of vertices to account for difference in normalization
+ val centerRank = 0.462394787 * nVertices
+ val othersRank = 0.005430356 * nVertices
+ val igraphPR = centerRank +: Seq.fill(nVertices - 1)(othersRank)
+ val ranks = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR))
+ assert(compareRanks(staticRanks, ranks) < errorTol)
+ assert(compareRanks(dynamicRanks, ranks) < errorTol)
- val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache()
- assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
}
} // end of test Star PageRank
@@ -96,51 +105,62 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 2
val errorTol = 1.0e-5
- val staticRanks1 = starGraph.staticPersonalizedPageRank(0, numIter = 1, resetProb).vertices
- val staticRanks2 = starGraph.staticPersonalizedPageRank(0, numIter = 2, resetProb)
- .vertices.cache()
-
- // Static PageRank should only take 2 iterations to converge
- val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
- if (pr1 != pr2) 1 else 0
- }.map { case (vid, test) => test }.sum
- assert(notMatching === 0)
+ val staticRanks = starGraph.staticPersonalizedPageRank(0, numIter, resetProb).vertices.cache()
- val staticErrors = staticRanks2.map { case (vid, pr) =>
- val correct = (vid > 0 && pr == 0.0) ||
- (vid == 0 && pr == resetProb)
- if (!correct) 1 else 0
- }
- assert(staticErrors.sum === 0)
-
- val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
- assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+ val dynamicRanks = starGraph.personalizedPageRank(0, tol, resetProb).vertices.cache()
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
- val parallelStaticRanks1 = starGraph
- .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices {
+ val parallelStaticRanks = starGraph
+ .staticParallelPersonalizedPageRank(Array(0), numIter, resetProb).mapVertices {
case (vertexId, vector) => vector(0)
}.vertices.cache()
- assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)
+ assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
+
+ // Computed in igraph 1.0 w/ R bindings:
+ // > page_rank(make_star(100, mode = "in"), personalized = c(1, rep(0, 99)), algo = "arpack")
+ // NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all
+ // vertices uniformly instead of just to the personalization source.
+ // Alternatively in NetworkX 1.11:
+ // > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)]),
+ // personalization=dict([(x, 1 if x == 0 else 0) for x in range(0,100)]))
+ // We multiply by the number of vertices to account for difference in normalization
+ val igraphPR0 = 1.0 +: Seq.fill(nVertices - 1)(0.0)
+ val ranks0 = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR0))
+ assert(compareRanks(staticRanks, ranks0) < errorTol)
+ assert(compareRanks(dynamicRanks, ranks0) < errorTol)
- val parallelStaticRanks2 = starGraph
- .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
- case (vertexId, vector) => vector(0)
- }.vertices.cache()
- assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)
// We have one outbound edge from 1 to 0
- val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
+ val otherStaticRanks = starGraph.staticPersonalizedPageRank(1, numIter, resetProb)
.vertices.cache()
- val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
- val otherParallelStaticRanks2 = starGraph
- .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
+ val otherDynamicRanks = starGraph.personalizedPageRank(1, tol, resetProb).vertices.cache()
+ val otherParallelStaticRanks = starGraph
+ .staticParallelPersonalizedPageRank(Array(0, 1), numIter, resetProb).mapVertices {
case (vertexId, vector) => vector(1)
}.vertices.cache()
- assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
- assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol)
- assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol)
+ assert(compareRanks(otherDynamicRanks, otherStaticRanks) < errorTol)
+ assert(compareRanks(otherStaticRanks, otherParallelStaticRanks) < errorTol)
+ assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks) < errorTol)
+
+ // Computed in igraph 1.0 w/ R bindings:
+ // > page_rank(make_star(100, mode = "in"),
+ // personalized = c(0, 1, rep(0, 98)), algo = "arpack")
+ // NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all
+ // vertices uniformly instead of just to the personalization source.
+ // Alternatively in NetworkX 1.11:
+ // > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)]),
+ // personalization=dict([(x, 1 if x == 1 else 0) for x in range(0,100)]))
+ val centerRank = 0.4594595
+ val sourceRank = 0.5405405
+ val igraphPR1 = centerRank +: sourceRank +: Seq.fill(nVertices - 2)(0.0)
+ val ranks1 = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR1))
+ assert(compareRanks(otherStaticRanks, ranks1) < errorTol)
+ assert(compareRanks(otherDynamicRanks, ranks1) < errorTol)
+ assert(compareRanks(otherParallelStaticRanks, ranks1) < errorTol)
}
} // end of test Star PersonalPageRank
@@ -229,4 +249,50 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
}
}
+
+ test("Loop with sink PageRank") {
+ withSpark { sc =>
+ val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 1L) :: (1L, 4L) :: Nil)
+ val g = Graph.fromEdgeTuples(edges, 1)
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 20
+ val errorTol = 1.0e-5
+
+ val staticRanks = g.staticPageRank(numIter, resetProb).vertices.cache()
+ val dynamicRanks = g.pageRank(tol, resetProb).vertices.cache()
+
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+
+ // Computed in igraph 1.0 w/ R bindings:
+ // > page_rank(graph_from_literal( A -+ B -+ C -+ A -+ D))
+ // Alternatively in NetworkX 1.11:
+ // > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,1),(1,4)]))
+ // We multiply by the number of vertices to account for difference in normalization
+ val igraphPR = Seq(0.3078534, 0.2137622, 0.2646223, 0.2137622).map(_ * 4)
+ val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR))
+ assert(compareRanks(staticRanks, ranks) < errorTol)
+ assert(compareRanks(dynamicRanks, ranks) < errorTol)
+
+ val p1staticRanks = g.staticPersonalizedPageRank(1, numIter, resetProb).vertices.cache()
+ val p1dynamicRanks = g.personalizedPageRank(1, tol, resetProb).vertices.cache()
+ val p1parallelDynamicRanks =
+ g.staticParallelPersonalizedPageRank(Array(1, 2, 3, 4), numIter, resetProb)
+ .vertices.mapValues(v => v(0)).cache()
+
+ // Computed in igraph 1.0 w/ R bindings:
+ // > page_rank(graph_from_literal( A -+ B -+ C -+ A -+ D), personalized = c(1, 0, 0, 0),
+ // algo = "arpack")
+ // NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all
+ // vertices uniformly instead of just to the personalization source.
+ // Alternatively in NetworkX 1.11:
+ // > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,1),(1,4)]), personalization={1:1, 2:0, 3:0, 4:0})
+ val igraphPR2 = Seq(0.4522329, 0.1921990, 0.1633691, 0.1921990)
+ val ranks2 = VertexRDD(sc.parallelize(1L to 4L zip igraphPR2))
+ assert(compareRanks(p1staticRanks, ranks2) < errorTol)
+ assert(compareRanks(p1dynamicRanks, ranks2) < errorTol)
+ assert(compareRanks(p1parallelDynamicRanks, ranks2) < errorTol)
+
+ }
+ }
}