aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-01-12 21:26:37 -0800
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-01-12 21:26:37 -0800
commit2216319f485ca2d00a946c4478dedc8a0e1c6053 (patch)
treec3571104877b63cb8821a6557c9225939cc56943 /graphx
parentc787ff5640ad9d6f6dc3b744d73a1cb0c91eb90a (diff)
downloadspark-2216319f485ca2d00a946c4478dedc8a0e1c6053.tar.gz
spark-2216319f485ca2d00a946c4478dedc8a0e1c6053.tar.bz2
spark-2216319f485ca2d00a946c4478dedc8a0e1c6053.zip
adding Pregel as an operator in GraphOps and cleaning up documentation of GraphOps
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala92
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala4
2 files changed, 74 insertions, 22 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 0121cb1449..4fdff29f5a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -6,10 +6,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.SparkException
-
/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
- * efficient GraphX API. This class is implicitly constructed for each Graph object.
+ * efficient GraphX API. This class is implicitly constructed for each Graph object.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
@@ -19,32 +18,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/** The number of edges in the graph. */
lazy val numEdges: Long = graph.edges.count()
-
/** The number of vertices in the graph. */
lazy val numVertices: Long = graph.vertices.count()
-
/**
* The in-degree of each vertex in the graph.
* @note Vertices with no in-edges are not returned in the resulting RDD.
*/
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
-
/**
* The out-degree of each vertex in the graph.
* @note Vertices with no out-edges are not returned in the resulting RDD.
*/
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
-
/**
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Both)
-
/**
* Computes the neighboring vertex degrees.
*
@@ -76,10 +70,10 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* age for each user:
*
* {{{
- * val graph: Graph[Int,Int] = loadGraph()
+ * val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph")
* val averageFollowerAge: RDD[(Int, Int)] =
* graph.aggregateNeighbors[(Int,Double)](
- * (vid, edge) => (edge.otherVertex(vid).data, 1),
+ * (vid, edge) => Some((edge.otherVertex(vid).data, 1)),
* (a, b) => (a._1 + b._1, a._2 + b._2),
* -1,
* EdgeDirection.In)
@@ -111,11 +105,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA))
}
}
-
graph.mapReduceTriplets(mf, reduceFunc)
} // end of aggregateNeighbors
-
/**
* Collect the neighbor vertex ids for each vertex.
*
@@ -147,7 +139,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
}
} // end of collectNeighborIds
-
/**
* Collect the neighbor vertex attributes for each vertex.
*
@@ -173,7 +164,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
}
} // end of collectNeighbor
-
/**
* Join the vertices with an RDD and then apply a function from the
* the vertex and RDD entry to a new vertex value. The input table
@@ -188,17 +178,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* corresponding entry in the table otherwise the old vertex value
* is used.
*
- * @note for small tables this function can be much more efficient
- * than leftJoinVertices
- *
* @example This function is used to update the vertices with new
* values based on external data. For example we could add the out
* degree to each vertex record
*
* {{{
- * val rawGraph: Graph[Int,()] = Graph.textFile("webgraph")
+ * val rawGraph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "webgraph")
* .mapVertices(v => 0)
- * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees()
+ * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees
* val graph = rawGraph.leftJoinVertices[Int,Int](outDeg,
* (v, deg) => deg )
* }}}
@@ -219,8 +206,10 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* Filter the graph by computing some values to filter on, and applying the predicates.
*
* @param preprocess a function to compute new vertex and edge data before filtering
- * @param epred edge pred to filter on after preprocess, see more details under Graph#subgraph
- * @param vpred vertex pred to filter on after prerocess, see more details under Graph#subgraph
+ * @param epred edge pred to filter on after preprocess, see more details under
+ * [[org.apache.spark.graphx.Graph#subgraph]]
+ * @param vpred vertex pred to filter on after prerocess, see more details under
+ * [[org.apache.spark.graphx.Graph#subgraph]]
* @tparam VD2 vertex type the vpred operates on
* @tparam ED2 edge type the epred operates on
* @return a subgraph of the orginal graph, with its data unchanged
@@ -246,4 +235,67 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = {
graph.mask(preprocess(graph).subgraph(epred, vpred))
}
+
+ /**
+ * Execute a Pregel-like iterative vertex-parallel abstraction. The
+ * user-defined vertex-program `vprog` is executed in parallel on
+ * each vertex receiving any inbound messages and computing a new
+ * value for the vertex. The `sendMsg` function is then invoked on
+ * all out-edges and is used to compute an optional message to the
+ * destination vertex. The `mergeMsg` function is a commutative
+ * associative function used to combine messages destined to the
+ * same vertex.
+ *
+ * On the first iteration all vertices receive the `initialMsg` and
+ * on subsequent iterations if a vertex does not receive a message
+ * then the vertex-program is not invoked.
+ *
+ * This function iterates until there are no remaining messages, or
+ * for `maxIterations` iterations.
+ *
+ * @tparam VD the vertex data type
+ * @tparam ED the edge data type
+ * @tparam A the Pregel message type
+ *
+ * @param graph the input graph.
+ *
+ * @param initialMsg the message each vertex will receive at the on
+ * the first iteration
+ *
+ * @param maxIterations the maximum number of iterations to run for
+ *
+ * @param activeDirection the direction of edges incident to a vertex that received a message in
+ * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
+ * out-edges of vertices that received a message in the previous round will run.
+ *
+ * @param vprog the user-defined vertex program which runs on each
+ * vertex and receives the inbound message and computes a new vertex
+ * value. On the first iteration the vertex program is invoked on
+ * all vertices and is passed the default message. On subsequent
+ * iterations the vertex program is only invoked on those vertices
+ * that receive messages.
+ *
+ * @param sendMsg a user supplied function that is applied to out
+ * edges of vertices that received messages in the current
+ * iteration
+ *
+ * @param mergeMsg a user supplied function that takes two incoming
+ * messages of type A and merges them into a single message of type
+ * A. ''This function must be commutative and associative and
+ * ideally the size of A should not increase.''
+ *
+ * @return the resulting graph at the end of the computation
+ *
+ */
+ def pregel[A: ClassTag](
+ initialMsg: A,
+ maxIterations: Int = Int.MaxValue,
+ activeDirection: EdgeDirection = EdgeDirection.Out)(
+ vprog: (VertexID, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+ mergeMsg: (A, A) => A)
+ : Graph[VD, ED] = {
+ Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
+ }
+
} // end of GraphOps
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 57b087213f..83e28d0ab2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -25,8 +25,8 @@ import scala.reflect.ClassTag
*
* def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum
- * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Option[Double] =
- * Some(edge.srcAttr * edge.attr)
+ * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
+ * Iterator((edge.dstId, edge.srcAttr * edge.attr))
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
* // Execute Pregel for a fixed number of iterations.