aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-11 11:49:21 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-11 11:49:35 -0800
commit732333d78e46ee23025d81ca9fbe6d1e13e9f253 (patch)
tree1c22029afcc479e6497a92e94e34f7cb111b3d2e /graphx
parent0b5c49ebad9dfb69074e2638c05a07b5ab94e13a (diff)
downloadspark-732333d78e46ee23025d81ca9fbe6d1e13e9f253.tar.gz
spark-732333d78e46ee23025d81ca9fbe6d1e13e9f253.tar.bz2
spark-732333d78e46ee23025d81ca9fbe6d1e13e9f253.zip
Remove GraphLab
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala138
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala9
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala50
3 files changed, 34 insertions, 163 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
deleted file mode 100644
index 2f828ad807..0000000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-package org.apache.spark.graphx
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.Logging
-import scala.collection.JavaConversions._
-import org.apache.spark.rdd.RDD
-
-/**
- * Implements the GraphLab gather-apply-scatter API.
- */
-object GraphLab extends Logging {
-
- /**
- * Executes the GraphLab Gather-Apply-Scatter API.
- *
- * @param graph the graph on which to execute the GraphLab API
- * @param gatherFunc executed on each edge triplet
- * adjacent to a vertex. Returns an accumulator which
- * is then merged using the merge function.
- * @param mergeFunc an accumulative associative operation on the result of
- * the gather type.
- * @param applyFunc takes a vertex and the final result of the merge operations
- * on the adjacent edges and returns a new vertex value.
- * @param scatterFunc executed after the apply function. Takes
- * a triplet and signals whether the neighboring vertex program
- * must be recomputed.
- * @param startVertices a predicate to determine which vertices to start the computation on.
- * These will be the active vertices in the first iteration.
- * @param numIter the maximum number of iterations to run
- * @param gatherDirection the direction of edges to consider during the gather phase
- * @param scatterDirection the direction of edges to consider during the scatter phase
- *
- * @tparam VD the graph vertex attribute type
- * @tparam ED the graph edge attribute type
- * @tparam A the type accumulated during the gather phase
- * @return the resulting graph after the algorithm converges
- *
- * @note Unlike [[Pregel]], this implementation of [[GraphLab]] does not unpersist RDDs from
- * previous iterations. As a result, long-running iterative GraphLab programs will eventually fill
- * the Spark cache. Though Spark will evict RDDs from old iterations eventually, garbage
- * collection will take longer than necessary since it must examine the entire cache. This will be
- * fixed in a future update.
- */
- def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
- (graph: Graph[VD, ED], numIter: Int,
- gatherDirection: EdgeDirection = EdgeDirection.In,
- scatterDirection: EdgeDirection = EdgeDirection.Out)
- (gatherFunc: (VertexID, EdgeTriplet[VD, ED]) => A,
- mergeFunc: (A, A) => A,
- applyFunc: (VertexID, VD, Option[A]) => VD,
- scatterFunc: (VertexID, EdgeTriplet[VD, ED]) => Boolean,
- startVertices: (VertexID, VD) => Boolean = (vid: VertexID, data: VD) => true)
- : Graph[VD, ED] = {
-
-
- // Add an active attribute to all vertices to track convergence.
- var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
- case (id, data) => (startVertices(id, data), data)
- }.cache()
-
- // The gather function wrapper strips the active attribute and
- // only invokes the gather function on active vertices
- def gather(vid: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
- if (e.vertexAttr(vid)._1) {
- val edgeTriplet = new EdgeTriplet[VD,ED]
- edgeTriplet.set(e)
- edgeTriplet.srcAttr = e.srcAttr._2
- edgeTriplet.dstAttr = e.dstAttr._2
- Some(gatherFunc(vid, edgeTriplet))
- } else {
- None
- }
- }
-
- // The apply function wrapper strips the vertex of the active attribute
- // and only invokes the apply function on active vertices
- def apply(vid: VertexID, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
- val (active, vData) = data
- if (active) (true, applyFunc(vid, vData, accum))
- else (false, vData)
- }
-
- // The scatter function wrapper strips the vertex of the active attribute
- // and only invokes the scatter function on active vertices
- def scatter(rawVertexID: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
- val vid = e.otherVertexId(rawVertexID)
- if (e.vertexAttr(vid)._1) {
- val edgeTriplet = new EdgeTriplet[VD,ED]
- edgeTriplet.set(e)
- edgeTriplet.srcAttr = e.srcAttr._2
- edgeTriplet.dstAttr = e.dstAttr._2
- Some(scatterFunc(vid, edgeTriplet))
- } else {
- None
- }
- }
-
- // Used to set the active status of vertices for the next round
- def applyActive(
- vid: VertexID, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
- val (prevActive, vData) = data
- (newActiveOpt.getOrElse(false), vData)
- }
-
- // Main Loop ---------------------------------------------------------------------
- var i = 0
- var numActive = activeGraph.numVertices
- var prevActiveGraph: Graph[(Boolean, VD), ED] = null
- while (i < numIter && numActive > 0) {
-
- // Gather
- val gathered: RDD[(VertexID, A)] =
- activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
-
- // Apply
- val applied = activeGraph.outerJoinVertices(gathered)(apply).cache()
-
- // Scatter is basically a gather in the opposite direction so we reverse the edge direction
- val scattered: RDD[(VertexID, Boolean)] =
- applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
-
- prevActiveGraph = activeGraph
- activeGraph = applied.outerJoinVertices(scattered)(applyActive).cache()
-
- // Calculate the number of active vertices.
- numActive = activeGraph.vertices.map{
- case (vid, data) => if (data._1) 1 else 0
- }.reduce(_ + _)
- logInfo("Number active vertices: " + numActive)
-
- i += 1
- }
-
- // Remove the active attribute from the vertex data before returning the graph
- activeGraph.mapVertices{case (vid, data) => data._2 }
- }
-}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 2e6453484c..57b087213f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -65,6 +65,10 @@ object Pregel {
*
* @param maxIterations the maximum number of iterations to run for
*
+ * @param activeDirection the direction of edges incident to a vertex that received a message in
+ * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
+ * out-edges of vertices that received a message in the previous round will run.
+ *
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
@@ -85,7 +89,8 @@ object Pregel {
*
*/
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
- (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
+ (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue,
+ activeDirection: EdgeDirection = EdgeDirection.Out)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
@@ -110,7 +115,7 @@ object Pregel {
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
// get to send messages. We must cache messages so it can be materialized on the next line,
// allowing us to uncache the previous iteration.
- messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
+ messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache()
// The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
index 9bd227309a..43c4b9cf2d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
@@ -53,34 +53,38 @@ object StronglyConnectedComponents {
// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
- sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)(
- (vid, e) => e.otherVertexAttr(vid)._1,
- (vid1, vid2) => math.min(vid1, vid2),
- (vid, scc, optScc) =>
- (math.min(scc._1, optScc.getOrElse(scc._1)), scc._2),
- (vid, e) => e.vertexAttr(vid)._1 < e.otherVertexAttr(vid)._1
- )
+ sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Long.MaxValue)(
+ (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
+ e => {
+ if (e.srcId < e.dstId) {
+ Iterator((e.dstId, e.srcAttr._1))
+ } else {
+ Iterator()
+ }
+ },
+ (vid1, vid2) => math.min(vid1, vid2))
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match!
- sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean](
- sccWorkGraph,
- Integer.MAX_VALUE,
- EdgeDirection.Out,
- EdgeDirection.In
- )(
+ sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean](
+ sccWorkGraph, false, activeDirection = EdgeDirection.In)(
// vertex is final if it is the root of a color
// or it has the same color as a neighbor that is final
- (vid, e) => (vid == e.vertexAttr(vid)._1) || (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1),
- (final1, final2) => final1 || final2,
- (vid, scc, optFinal) =>
- (scc._1, scc._2 || optFinal.getOrElse(false)),
- // activate neighbor if they are not final, you are, and you have the same color
- (vid, e) => e.vertexAttr(vid)._2 &&
- !e.otherVertexAttr(vid)._2 && (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1),
- // start at root of colors
- (vid, data) => vid == data._1
- )
+ (vid, myScc, existsSameColorFinalNeighbor) => {
+ val isColorRoot = vid == myScc._1
+ (myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor)
+ },
+ // activate neighbor if they are not final, you are, and you have the same color
+ e => {
+ val sameColor = e.dstAttr._1 == e.srcAttr._1
+ val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2
+ if (sameColor && onlyDstIsFinal) {
+ Iterator((e.srcId, e.dstAttr._2))
+ } else {
+ Iterator()
+ }
+ },
+ (final1, final2) => final1 || final2)
}
sccGraph
}