From 732333d78e46ee23025d81ca9fbe6d1e13e9f253 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sat, 11 Jan 2014 11:49:21 -0800 Subject: Remove GraphLab --- .../scala/org/apache/spark/graphx/GraphLab.scala | 138 --------------------- .../scala/org/apache/spark/graphx/Pregel.scala | 9 +- .../graphx/lib/StronglyConnectedComponents.scala | 50 ++++---- 3 files changed, 34 insertions(+), 163 deletions(-) delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala deleted file mode 100644 index 2f828ad807..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala +++ /dev/null @@ -1,138 +0,0 @@ -package org.apache.spark.graphx - -import scala.reflect.ClassTag - -import org.apache.spark.Logging -import scala.collection.JavaConversions._ -import org.apache.spark.rdd.RDD - -/** - * Implements the GraphLab gather-apply-scatter API. - */ -object GraphLab extends Logging { - - /** - * Executes the GraphLab Gather-Apply-Scatter API. - * - * @param graph the graph on which to execute the GraphLab API - * @param gatherFunc executed on each edge triplet - * adjacent to a vertex. Returns an accumulator which - * is then merged using the merge function. - * @param mergeFunc an accumulative associative operation on the result of - * the gather type. - * @param applyFunc takes a vertex and the final result of the merge operations - * on the adjacent edges and returns a new vertex value. - * @param scatterFunc executed after the apply function. Takes - * a triplet and signals whether the neighboring vertex program - * must be recomputed. - * @param startVertices a predicate to determine which vertices to start the computation on. - * These will be the active vertices in the first iteration. - * @param numIter the maximum number of iterations to run - * @param gatherDirection the direction of edges to consider during the gather phase - * @param scatterDirection the direction of edges to consider during the scatter phase - * - * @tparam VD the graph vertex attribute type - * @tparam ED the graph edge attribute type - * @tparam A the type accumulated during the gather phase - * @return the resulting graph after the algorithm converges - * - * @note Unlike [[Pregel]], this implementation of [[GraphLab]] does not unpersist RDDs from - * previous iterations. As a result, long-running iterative GraphLab programs will eventually fill - * the Spark cache. Though Spark will evict RDDs from old iterations eventually, garbage - * collection will take longer than necessary since it must examine the entire cache. This will be - * fixed in a future update. - */ - def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] - (graph: Graph[VD, ED], numIter: Int, - gatherDirection: EdgeDirection = EdgeDirection.In, - scatterDirection: EdgeDirection = EdgeDirection.Out) - (gatherFunc: (VertexID, EdgeTriplet[VD, ED]) => A, - mergeFunc: (A, A) => A, - applyFunc: (VertexID, VD, Option[A]) => VD, - scatterFunc: (VertexID, EdgeTriplet[VD, ED]) => Boolean, - startVertices: (VertexID, VD) => Boolean = (vid: VertexID, data: VD) => true) - : Graph[VD, ED] = { - - - // Add an active attribute to all vertices to track convergence. - var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices { - case (id, data) => (startVertices(id, data), data) - }.cache() - - // The gather function wrapper strips the active attribute and - // only invokes the gather function on active vertices - def gather(vid: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = { - if (e.vertexAttr(vid)._1) { - val edgeTriplet = new EdgeTriplet[VD,ED] - edgeTriplet.set(e) - edgeTriplet.srcAttr = e.srcAttr._2 - edgeTriplet.dstAttr = e.dstAttr._2 - Some(gatherFunc(vid, edgeTriplet)) - } else { - None - } - } - - // The apply function wrapper strips the vertex of the active attribute - // and only invokes the apply function on active vertices - def apply(vid: VertexID, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = { - val (active, vData) = data - if (active) (true, applyFunc(vid, vData, accum)) - else (false, vData) - } - - // The scatter function wrapper strips the vertex of the active attribute - // and only invokes the scatter function on active vertices - def scatter(rawVertexID: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = { - val vid = e.otherVertexId(rawVertexID) - if (e.vertexAttr(vid)._1) { - val edgeTriplet = new EdgeTriplet[VD,ED] - edgeTriplet.set(e) - edgeTriplet.srcAttr = e.srcAttr._2 - edgeTriplet.dstAttr = e.dstAttr._2 - Some(scatterFunc(vid, edgeTriplet)) - } else { - None - } - } - - // Used to set the active status of vertices for the next round - def applyActive( - vid: VertexID, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = { - val (prevActive, vData) = data - (newActiveOpt.getOrElse(false), vData) - } - - // Main Loop --------------------------------------------------------------------- - var i = 0 - var numActive = activeGraph.numVertices - var prevActiveGraph: Graph[(Boolean, VD), ED] = null - while (i < numIter && numActive > 0) { - - // Gather - val gathered: RDD[(VertexID, A)] = - activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) - - // Apply - val applied = activeGraph.outerJoinVertices(gathered)(apply).cache() - - // Scatter is basically a gather in the opposite direction so we reverse the edge direction - val scattered: RDD[(VertexID, Boolean)] = - applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) - - prevActiveGraph = activeGraph - activeGraph = applied.outerJoinVertices(scattered)(applyActive).cache() - - // Calculate the number of active vertices. - numActive = activeGraph.vertices.map{ - case (vid, data) => if (data._1) 1 else 0 - }.reduce(_ + _) - logInfo("Number active vertices: " + numActive) - - i += 1 - } - - // Remove the active attribute from the vertex data before returning the graph - activeGraph.mapVertices{case (vid, data) => data._2 } - } -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 2e6453484c..57b087213f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -65,6 +65,10 @@ object Pregel { * * @param maxIterations the maximum number of iterations to run for * + * @param activeDirection the direction of edges incident to a vertex that received a message in + * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only + * out-edges of vertices that received a message in the previous round will run. + * * @param vprog the user-defined vertex program which runs on each * vertex and receives the inbound message and computes a new vertex * value. On the first iteration the vertex program is invoked on @@ -85,7 +89,8 @@ object Pregel { * */ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] - (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)( + (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, + activeDirection: EdgeDirection = EdgeDirection.Out)( vprog: (VertexID, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], mergeMsg: (A, A) => A) @@ -110,7 +115,7 @@ object Pregel { // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't // get to send messages. We must cache messages so it can be materialized on the next line, // allowing us to uncache the previous iteration. - messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache() + messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache() // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala index 9bd227309a..43c4b9cf2d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala @@ -53,34 +53,38 @@ object StronglyConnectedComponents { // collect min of all my neighbor's scc values, update if it's smaller than mine // then notify any neighbors with scc values larger than mine - sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)( - (vid, e) => e.otherVertexAttr(vid)._1, - (vid1, vid2) => math.min(vid1, vid2), - (vid, scc, optScc) => - (math.min(scc._1, optScc.getOrElse(scc._1)), scc._2), - (vid, e) => e.vertexAttr(vid)._1 < e.otherVertexAttr(vid)._1 - ) + sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Long.MaxValue)( + (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2), + e => { + if (e.srcId < e.dstId) { + Iterator((e.dstId, e.srcAttr._1)) + } else { + Iterator() + } + }, + (vid1, vid2) => math.min(vid1, vid2)) // start at root of SCCs. Traverse values in reverse, notify all my neighbors // do not propagate if colors do not match! - sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean]( - sccWorkGraph, - Integer.MAX_VALUE, - EdgeDirection.Out, - EdgeDirection.In - )( + sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean]( + sccWorkGraph, false, activeDirection = EdgeDirection.In)( // vertex is final if it is the root of a color // or it has the same color as a neighbor that is final - (vid, e) => (vid == e.vertexAttr(vid)._1) || (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), - (final1, final2) => final1 || final2, - (vid, scc, optFinal) => - (scc._1, scc._2 || optFinal.getOrElse(false)), - // activate neighbor if they are not final, you are, and you have the same color - (vid, e) => e.vertexAttr(vid)._2 && - !e.otherVertexAttr(vid)._2 && (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), - // start at root of colors - (vid, data) => vid == data._1 - ) + (vid, myScc, existsSameColorFinalNeighbor) => { + val isColorRoot = vid == myScc._1 + (myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor) + }, + // activate neighbor if they are not final, you are, and you have the same color + e => { + val sameColor = e.dstAttr._1 == e.srcAttr._1 + val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2 + if (sameColor && onlyDstIsFinal) { + Iterator((e.srcId, e.dstAttr._2)) + } else { + Iterator() + } + }, + (final1, final2) => final1 || final2) } sccGraph } -- cgit v1.2.3