aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorDan McClary <dan.mcclary@gmail.com>2015-05-01 11:55:43 -0700
committerJoseph Gonzalez <joseph.e.gonzalez@gmail.com>2015-05-01 11:55:43 -0700
commit7d427222dca4807ec55e8d9a7de6ffe861cd0d24 (patch)
tree319025199d7a2009c9bb27250baa27d93b60f1c5 /graphx
parent27de6fef6a852c0801b5d55d0e69096878bd7909 (diff)
downloadspark-7d427222dca4807ec55e8d9a7de6ffe861cd0d24.tar.gz
spark-7d427222dca4807ec55e8d9a7de6ffe861cd0d24.tar.bz2
spark-7d427222dca4807ec55e8d9a7de6ffe861cd0d24.zip
[SPARK-5854] personalized page rank
Here's a modification to PageRank which does personalized PageRank. The approach is basically similar to that outlined by Bahmani et al. from 2010 (http://arxiv.org/pdf/1006.2880.pdf). I'm sure this needs tuning up or other considerations, so let me know how I can improve this. Author: Dan McClary <dan.mcclary@gmail.com> Author: dwmclary <dan.mcclary@gmail.com> Closes #4774 from dwmclary/SPARK-5854-Personalized-PageRank and squashes the following commits: 8b907db [dwmclary] fixed scalastyle errors in PageRankSuite 2c20e5d [dwmclary] merged with upstream master d6cebac [dwmclary] updated as per style requests 7d00c23 [Dan McClary] fixed line overrun in personalizedVertexPageRank d711677 [Dan McClary] updated vertexProgram to restore binary compatibility for inner method bb8d507 [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank fba0edd [Dan McClary] fixed silly mistakes de51be2 [Dan McClary] cleaned up whitespace between comments and methods 0c30d0c [Dan McClary] updated to maintain binary compatibility aaf0b4b [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank 76773f6 [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank 44ada8e [Dan McClary] updated tolerance on chain PPR 1ffed95 [Dan McClary] updated tolerance on chain PPR b67ac69 [Dan McClary] updated tolerance on chain PPR a560942 [Dan McClary] rolled PPR into pregel code for PageRank 6dc2c29 [Dan McClary] initial implementation of personalized page rank
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala25
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala93
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala47
3 files changed, 159 insertions, 6 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 86f611d55a..7edd627b20 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -372,6 +372,31 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
PageRank.runUntilConvergence(graph, tol, resetProb)
}
+
+ /**
+ * Run personalized PageRank for a given vertex, such that all random walks
+ * are started relative to the source node.
+ *
+ * @see [[org.apache.spark.graphx.lib.PageRank$#runUntilConvergenceWithOptions]]
+ */
+ def personalizedPageRank(src: VertexId, tol: Double,
+ resetProb: Double = 0.15) : Graph[Double, Double] = {
+ PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src))
+ }
+
+ /**
+ * Run Personalized PageRank for a fixed number of iterations with
+ * with all iterations originating at the source node
+ * returning a graph with vertex attributes
+ * containing the PageRank and edge attributes the normalized edge weight.
+ *
+ * @see [[org.apache.spark.graphx.lib.PageRank$#runWithOptions]]
+ */
+ def staticPersonalizedPageRank(src: VertexId, numIter: Int,
+ resetProb: Double = 0.15) : Graph[Double, Double] = {
+ PageRank.runWithOptions(graph, numIter, resetProb, Some(src))
+ }
+
/**
* Run PageRank for a fixed number of iterations returning a graph with vertex attributes
* containing the PageRank and edge attributes the normalized edge weight.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 042e366a29..bc974b2f04 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -18,6 +18,7 @@
package org.apache.spark.graphx.lib
import scala.reflect.ClassTag
+import scala.language.postfixOps
import org.apache.spark.Logging
import org.apache.spark.graphx._
@@ -60,6 +61,7 @@ import org.apache.spark.graphx._
*/
object PageRank extends Logging {
+
/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
@@ -74,10 +76,33 @@ object PageRank extends Logging {
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
+ */
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
+ resetProb: Double = 0.15): Graph[Double, Double] =
+ {
+ runWithOptions(graph, numIter, resetProb)
+ }
+
+ /**
+ * Run PageRank for a fixed number of iterations returning a graph
+ * with vertex attributes containing the PageRank and edge
+ * attributes the normalized edge weight.
+ *
+ * @tparam VD the original vertex attribute (not used)
+ * @tparam ED the original edge attribute (not used)
+ *
+ * @param graph the graph on which to compute PageRank
+ * @param numIter the number of iterations of PageRank to run
+ * @param resetProb the random reset probability (alpha)
+ * @param srcId the source vertex for a Personalized Page Rank (optional)
+ *
+ * @return the graph containing with each vertex containing the PageRank and each edge
+ * containing the normalized weight.
*
*/
- def run[VD: ClassTag, ED: ClassTag](
- graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
+ def runWithOptions[VD: ClassTag, ED: ClassTag](
+ graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
+ srcId: Option[VertexId] = None): Graph[Double, Double] =
{
// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute 1.0.
@@ -89,6 +114,10 @@ object PageRank extends Logging {
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => resetProb )
+ val personalized = srcId isDefined
+ val src: VertexId = srcId.getOrElse(-1L)
+ def delta(u: VertexId, v: VertexId):Double = { if (u == v) 1.0 else 0.0 }
+
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
@@ -103,8 +132,14 @@ object PageRank extends Logging {
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
+ val rPrb = if (personalized) {
+ (src: VertexId ,id: VertexId) => resetProb * delta(src,id)
+ } else {
+ (src: VertexId, id: VertexId) => resetProb
+ }
+
rankGraph = rankGraph.joinVertices(rankUpdates) {
- (id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
+ (id, oldRank, msgSum) => rPrb(src,id) + (1.0 - resetProb) * msgSum
}.cache()
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
@@ -133,7 +168,29 @@ object PageRank extends Logging {
* containing the normalized weight.
*/
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
- graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
+ graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
+ {
+ runUntilConvergenceWithOptions(graph, tol, resetProb)
+ }
+
+ /**
+ * Run a dynamic version of PageRank returning a graph with vertex attributes containing the
+ * PageRank and edge attributes containing the normalized edge weight.
+ *
+ * @tparam VD the original vertex attribute (not used)
+ * @tparam ED the original edge attribute (not used)
+ *
+ * @param graph the graph on which to compute PageRank
+ * @param tol the tolerance allowed at convergence (smaller => more accurate).
+ * @param resetProb the random reset probability (alpha)
+ * @param srcId the source vertex for a Personalized Page Rank (optional)
+ *
+ * @return the graph containing with each vertex containing the PageRank and each edge
+ * containing the normalized weight.
+ */
+ def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](
+ graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
+ srcId: Option[VertexId] = None): Graph[Double, Double] =
{
// Initialize the pagerankGraph with each edge attribute
// having weight 1/outDegree and each vertex with attribute 1.0.
@@ -148,6 +205,10 @@ object PageRank extends Logging {
.mapVertices( (id, attr) => (0.0, 0.0) )
.cache()
+ val personalized = srcId.isDefined
+ val src: VertexId = srcId.getOrElse(-1L)
+
+
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
@@ -156,7 +217,18 @@ object PageRank extends Logging {
(newPR, newPR - oldPR)
}
- def sendMessage(edge: EdgeTriplet[(Double, Double), Double]): Iterator[(VertexId, Double)] = {
+ def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
+ msgSum: Double): (Double, Double) = {
+ val (oldPR, lastDelta) = attr
+ var teleport = oldPR
+ val delta = if (src==id) 1.0 else 0.0
+ teleport = oldPR*delta
+
+ val newPR = teleport + (1.0 - resetProb) * msgSum
+ (newPR, newPR - oldPR)
+ }
+
+ def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
if (edge.srcAttr._2 > tol) {
Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
} else {
@@ -170,8 +242,17 @@ object PageRank extends Logging {
val initialMessage = resetProb / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
+ val vp = if (personalized) {
+ (id: VertexId, attr: (Double, Double),msgSum: Double) =>
+ personalizedVertexProgram(id, attr, msgSum)
+ } else {
+ (id: VertexId, attr: (Double, Double), msgSum: Double) =>
+ vertexProgram(id, attr, msgSum)
+ }
+
Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
- vertexProgram, sendMessage, messageCombiner)
+ vp, sendMessage, messageCombiner)
.mapVertices((vid, attr) => attr._1)
} // end of deltaPageRank
+
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index 95804b07b1..3f3c9dfd7b 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -92,6 +92,36 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
}
} // end of test Star PageRank
+ test("Star PersonalPageRank") {
+ withSpark { sc =>
+ val nVertices = 100
+ val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
+ val resetProb = 0.15
+ val errorTol = 1.0e-5
+
+ val staticRanks1 = starGraph.staticPersonalizedPageRank(0,numIter = 1, resetProb).vertices
+ val staticRanks2 = starGraph.staticPersonalizedPageRank(0,numIter = 2, resetProb)
+ .vertices.cache()
+
+ // Static PageRank should only take 2 iterations to converge
+ val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
+ if (pr1 != pr2) 1 else 0
+ }.map { case (vid, test) => test }.sum
+ assert(notMatching === 0)
+
+ val staticErrors = staticRanks2.map { case (vid, pr) =>
+ val correct = (vid > 0 && pr == resetProb) ||
+ (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb *
+ (nVertices - 1)) )) < 1.0E-5)
+ if (!correct) 1 else 0
+ }
+ assert(staticErrors.sum === 0)
+
+ val dynamicRanks = starGraph.personalizedPageRank(0,0, resetProb).vertices.cache()
+ assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+ }
+ } // end of test Star PageRank
+
test("Grid PageRank") {
withSpark { sc =>
val rows = 10
@@ -128,4 +158,21 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
}
}
+
+ test("Chain PersonalizedPageRank") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x + 1) )
+ val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
+ val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 10
+ val errorTol = 1.0e-1
+
+ val staticRanks = chain.staticPersonalizedPageRank(4, numIter, resetProb).vertices
+ val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
+
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+ }
+ }
}