From 2216319f485ca2d00a946c4478dedc8a0e1c6053 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Sun, 12 Jan 2014 21:26:37 -0800 Subject: adding Pregel as an operator in GraphOps and cleaning up documentation of GraphOps --- .../scala/org/apache/spark/graphx/GraphOps.scala | 92 +++++++++++++++++----- .../scala/org/apache/spark/graphx/Pregel.scala | 4 +- 2 files changed, 74 insertions(+), 22 deletions(-) (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 0121cb1449..4fdff29f5a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -6,10 +6,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.SparkException - /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the - * efficient GraphX API. This class is implicitly constructed for each Graph object. + * efficient GraphX API. This class is implicitly constructed for each Graph object. * * @tparam VD the vertex attribute type * @tparam ED the edge attribute type @@ -19,32 +18,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { /** The number of edges in the graph. */ lazy val numEdges: Long = graph.edges.count() - /** The number of vertices in the graph. */ lazy val numVertices: Long = graph.vertices.count() - /** * The in-degree of each vertex in the graph. * @note Vertices with no in-edges are not returned in the resulting RDD. */ lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In) - /** * The out-degree of each vertex in the graph. * @note Vertices with no out-edges are not returned in the resulting RDD. */ lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out) - /** * The degree of each vertex in the graph. * @note Vertices with no edges are not returned in the resulting RDD. */ lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Both) - /** * Computes the neighboring vertex degrees. * @@ -76,10 +70,10 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * age for each user: * * {{{ - * val graph: Graph[Int,Int] = loadGraph() + * val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph") * val averageFollowerAge: RDD[(Int, Int)] = * graph.aggregateNeighbors[(Int,Double)]( - * (vid, edge) => (edge.otherVertex(vid).data, 1), + * (vid, edge) => Some((edge.otherVertex(vid).data, 1)), * (a, b) => (a._1 + b._1, a._2 + b._2), * -1, * EdgeDirection.In) @@ -111,11 +105,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA)) } } - graph.mapReduceTriplets(mf, reduceFunc) } // end of aggregateNeighbors - /** * Collect the neighbor vertex ids for each vertex. * @@ -147,7 +139,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { } } // end of collectNeighborIds - /** * Collect the neighbor vertex attributes for each vertex. * @@ -173,7 +164,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { } } // end of collectNeighbor - /** * Join the vertices with an RDD and then apply a function from the * the vertex and RDD entry to a new vertex value. The input table @@ -188,17 +178,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * corresponding entry in the table otherwise the old vertex value * is used. * - * @note for small tables this function can be much more efficient - * than leftJoinVertices - * * @example This function is used to update the vertices with new * values based on external data. For example we could add the out * degree to each vertex record * * {{{ - * val rawGraph: Graph[Int,()] = Graph.textFile("webgraph") + * val rawGraph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "webgraph") * .mapVertices(v => 0) - * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees() + * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees * val graph = rawGraph.leftJoinVertices[Int,Int](outDeg, * (v, deg) => deg ) * }}} @@ -219,8 +206,10 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * Filter the graph by computing some values to filter on, and applying the predicates. * * @param preprocess a function to compute new vertex and edge data before filtering - * @param epred edge pred to filter on after preprocess, see more details under Graph#subgraph - * @param vpred vertex pred to filter on after prerocess, see more details under Graph#subgraph + * @param epred edge pred to filter on after preprocess, see more details under + * [[org.apache.spark.graphx.Graph#subgraph]] + * @param vpred vertex pred to filter on after prerocess, see more details under + * [[org.apache.spark.graphx.Graph#subgraph]] * @tparam VD2 vertex type the vpred operates on * @tparam ED2 edge type the epred operates on * @return a subgraph of the orginal graph, with its data unchanged @@ -246,4 +235,67 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = { graph.mask(preprocess(graph).subgraph(epred, vpred)) } + + /** + * Execute a Pregel-like iterative vertex-parallel abstraction. The + * user-defined vertex-program `vprog` is executed in parallel on + * each vertex receiving any inbound messages and computing a new + * value for the vertex. The `sendMsg` function is then invoked on + * all out-edges and is used to compute an optional message to the + * destination vertex. The `mergeMsg` function is a commutative + * associative function used to combine messages destined to the + * same vertex. + * + * On the first iteration all vertices receive the `initialMsg` and + * on subsequent iterations if a vertex does not receive a message + * then the vertex-program is not invoked. + * + * This function iterates until there are no remaining messages, or + * for `maxIterations` iterations. + * + * @tparam VD the vertex data type + * @tparam ED the edge data type + * @tparam A the Pregel message type + * + * @param graph the input graph. + * + * @param initialMsg the message each vertex will receive at the on + * the first iteration + * + * @param maxIterations the maximum number of iterations to run for + * + * @param activeDirection the direction of edges incident to a vertex that received a message in + * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only + * out-edges of vertices that received a message in the previous round will run. + * + * @param vprog the user-defined vertex program which runs on each + * vertex and receives the inbound message and computes a new vertex + * value. On the first iteration the vertex program is invoked on + * all vertices and is passed the default message. On subsequent + * iterations the vertex program is only invoked on those vertices + * that receive messages. + * + * @param sendMsg a user supplied function that is applied to out + * edges of vertices that received messages in the current + * iteration + * + * @param mergeMsg a user supplied function that takes two incoming + * messages of type A and merges them into a single message of type + * A. ''This function must be commutative and associative and + * ideally the size of A should not increase.'' + * + * @return the resulting graph at the end of the computation + * + */ + def pregel[A: ClassTag]( + initialMsg: A, + maxIterations: Int = Int.MaxValue, + activeDirection: EdgeDirection = EdgeDirection.Out)( + vprog: (VertexID, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { + Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) + } + } // end of GraphOps diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 57b087213f..83e28d0ab2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -25,8 +25,8 @@ import scala.reflect.ClassTag * * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = * resetProb + (1.0 - resetProb) * msgSum - * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Option[Double] = - * Some(edge.srcAttr * edge.attr) + * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = + * Iterator((edge.dstId, edge.srcAttr * edge.attr)) * def messageCombiner(a: Double, b: Double): Double = a + b * val initialMessage = 0.0 * // Execute Pregel for a fixed number of iterations. -- cgit v1.2.3