From 1bd5cefcae2769d48ad5ef4b8197193371c754da Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 13 Jan 2014 16:15:10 -0800 Subject: Remove aggregateNeighbors --- docs/graphx-programming-guide.md | 17 ------ .../scala/org/apache/spark/graphx/GraphOps.scala | 64 ++-------------------- .../org/apache/spark/graphx/GraphOpsSuite.scala | 26 --------- 3 files changed, 5 insertions(+), 102 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 002ba0cf73..e6afd092be 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -519,23 +519,6 @@ val avgAgeOlderFollowers: VertexRDD[Double] = > are constant sized (e.g., floats and addition instead of lists and concatenation). More > precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex. -Because it is often necessary to aggregate information about neighboring vertices we also provide an -alternative interface defined in [`GraphOps`][GraphOps]: - -{% highlight scala %} -def aggregateNeighbors[A]( - map: (VertexID, EdgeTriplet[VD, ED]) => Option[A], - reduce: (A, A) => A, - edgeDir: EdgeDirection) - : VertexRDD[A] -{% endhighlight %} - -The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but allows -the user to define the logic in a more vertex centric manner. Here the `map` function is provided -the vertex to which the message is sent as well as one of the edges and returns the optional message -value. The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All` edges -adjacent to each vertex. - ### Computing Degree Information A common aggregation task is computing the degree of each vertex: the number of edges adjacent to diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index a0a40e2d9a..578eb331c1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -55,60 +55,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { } } - /** - * Computes a statistic for the neighborhood of each vertex. - * - * @param mapFunc the function applied to each edge adjacent to each vertex. The mapFunc can - * optionally return `None`, in which case it does not contribute to the final sum. - * @param reduceFunc the function used to merge the results of each map operation - * @param direction the direction of edges to consider (e.g., In, Out, Both). - * @tparam A the aggregation type - * - * @return an RDD containing tuples of vertex identifiers and - * their resulting value. Vertices with no neighbors will not appear in the RDD. - * - * @example We can use this function to compute the average follower - * age for each user: - * - * {{{ - * val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph") - * val averageFollowerAge: RDD[(Int, Int)] = - * graph.aggregateNeighbors[(Int,Double)]( - * (vid, edge) => Some((edge.otherVertex(vid).data, 1)), - * (a, b) => (a._1 + b._1, a._2 + b._2), - * -1, - * EdgeDirection.In) - * .mapValues{ case (sum,followers) => sum.toDouble / followers} - * }}} - */ - def aggregateNeighbors[A: ClassTag]( - mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A], - reduceFunc: (A, A) => A, - dir: EdgeDirection) - : VertexRDD[A] = { - // Define a new map function over edge triplets - val mf = (et: EdgeTriplet[VD,ED]) => { - // Compute the message to the dst vertex - val dst = - if (dir == EdgeDirection.In || dir == EdgeDirection.Both) { - mapFunc(et.dstId, et) - } else { Option.empty[A] } - // Compute the message to the source vertex - val src = - if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) { - mapFunc(et.srcId, et) - } else { Option.empty[A] } - // construct the return array - (src, dst) match { - case (None, None) => Iterator.empty - case (Some(srcA),None) => Iterator((et.srcId, srcA)) - case (None, Some(dstA)) => Iterator((et.dstId, dstA)) - case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA)) - } - } - graph.mapReduceTriplets(mf, reduceFunc) - } // end of aggregateNeighbors - /** * Collect the neighbor vertex ids for each vertex. * @@ -152,11 +98,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * * @return the vertex set of neighboring vertex attributes for each vertex */ - def collectNeighbors(edgeDirection: EdgeDirection) : - VertexRDD[ Array[(VertexID, VD)] ] = { - val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]]( - (vid, edge) => - Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )), + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = { + val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]]( + edge => Iterator( + (edge.srcId, Array((edge.dstId, edge.dstAttr))), + (edge.dstId, Array((edge.srcId, edge.srcAttr)))), (a, b) => a ++ b, edgeDirection) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index cd3c0bbd30..7a901409d5 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -8,32 +8,6 @@ import org.scalatest.FunSuite class GraphOpsSuite extends FunSuite with LocalSparkContext { - test("aggregateNeighbors") { - withSpark { sc => - val n = 3 - val star = - Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1) - - val indegrees = star.aggregateNeighbors( - (vid, edge) => Some(1), - (a: Int, b: Int) => a + b, - EdgeDirection.In) - assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet) - - val outdegrees = star.aggregateNeighbors( - (vid, edge) => Some(1), - (a: Int, b: Int) => a + b, - EdgeDirection.Out) - assert(outdegrees.collect().toSet === Set((0, n))) - - val noVertexValues = star.aggregateNeighbors[Int]( - (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None, - (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"), - EdgeDirection.In) - assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)]) - } - } - test("joinVertices") { withSpark { sc => val vertices = -- cgit v1.2.3 From ae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 13 Jan 2014 16:48:11 -0800 Subject: Add EdgeDirection.Either and use it to fix CC bug The bug was due to a misunderstanding of the activeSetOpt parameter to Graph.mapReduceTriplets. Passing EdgeDirection.Both causes mapReduceTriplets to run only on edges with *both* vertices in the active set. This commit adds EdgeDirection.Either, which causes mapReduceTriplets to run on edges with *either* vertex in the active set. This is what connected components needed. --- .../org/apache/spark/graphx/EdgeDirection.scala | 8 +++-- .../main/scala/org/apache/spark/graphx/Graph.scala | 9 +++-- .../scala/org/apache/spark/graphx/GraphOps.scala | 33 ++++++++++------- .../scala/org/apache/spark/graphx/Pregel.scala | 7 ++-- .../org/apache/spark/graphx/impl/GraphImpl.scala | 4 +++ .../spark/graphx/lib/ConnectedComponents.scala | 41 +++++++--------------- .../org/apache/spark/graphx/lib/PageRank.scala | 5 +-- .../graphx/lib/StronglyConnectedComponents.scala | 3 +- .../apache/spark/graphx/lib/TriangleCount.scala | 2 +- .../org/apache/spark/graphx/GraphOpsSuite.scala | 2 +- .../org/apache/spark/graphx/PregelSuite.scala | 2 +- .../graphx/lib/ConnectedComponentsSuite.scala | 2 +- 12 files changed, 64 insertions(+), 54 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala index 9d37f6513f..5b58a61bbd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala @@ -6,11 +6,12 @@ package org.apache.spark.graphx class EdgeDirection private (private val name: String) extends Serializable { /** * Reverse the direction of an edge. An in becomes out, - * out becomes in and both remains both. + * out becomes in and both and either remain the same. */ def reverse: EdgeDirection = this match { case EdgeDirection.In => EdgeDirection.Out case EdgeDirection.Out => EdgeDirection.In + case EdgeDirection.Either => EdgeDirection.Either case EdgeDirection.Both => EdgeDirection.Both } @@ -32,6 +33,9 @@ object EdgeDirection { /** Edges originating from a vertex. */ final val Out = new EdgeDirection("Out") - /** All edges adjacent to a vertex. */ + /** Edges originating from *or* arriving at a vertex of interest. */ + final val Either = new EdgeDirection("Either") + + /** Edges originating from *and* arriving at a vertex of interest. */ final val Both = new EdgeDirection("Both") } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 7d4f0de3d6..49705fdf5d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -274,9 +274,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * of the map phase * * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to consider - * when running `mapFunc`. For example, if the direction is Out, `mapFunc` will only be run on - * edges originating from vertices in the active set. The active set must have the same index as - * the graph's vertices. + * when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on edges with + * destination in the active set. If the direction is `Out`, `mapFunc` will only be run on edges + * originating from vertices in the active set. If the direction is `Either`, `mapFunc` will be + * run on edges with *either* vertex in the active set. If the direction is `Both`, `mapFunc` will + * be run on edges with *both* vertices in the active set. The active set must have the same index + * as the graph's vertices. * * @example We can use this function to compute the in-degree of each * vertex diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 578eb331c1..66d5180020 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -38,7 +38,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * The degree of each vertex in the graph. * @note Vertices with no edges are not returned in the resulting RDD. */ - lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Both) + lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either) /** * Computes the neighboring vertex degrees. @@ -50,7 +50,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { graph.mapReduceTriplets(et => Iterator((et.dstId,1)), _ + _) } else if (edgeDirection == EdgeDirection.Out) { graph.mapReduceTriplets(et => Iterator((et.srcId,1)), _ + _) - } else { // EdgeDirection.both + } else { // EdgeDirection.Either graph.mapReduceTriplets(et => Iterator((et.srcId,1), (et.dstId,1)), _ + _) } } @@ -65,7 +65,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { */ def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = { val nbrs = - if (edgeDirection == EdgeDirection.Both) { + if (edgeDirection == EdgeDirection.Either) { graph.mapReduceTriplets[Array[VertexID]]( mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), reduceFunc = _ ++ _ @@ -79,7 +79,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { mapFunc = et => Iterator((et.dstId, Array(et.srcId))), reduceFunc = _ ++ _) } else { - throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.") + throw new SparkException("It doesn't make sense to collect neighbor ids without a " + + "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)") } graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[VertexID]) @@ -100,11 +101,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { */ def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = { val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]]( - edge => Iterator( - (edge.srcId, Array((edge.dstId, edge.dstAttr))), - (edge.dstId, Array((edge.srcId, edge.srcAttr)))), - (a, b) => a ++ b, - edgeDirection) + edge => { + val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr))) + val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr))) + edgeDirection match { + case EdgeDirection.Either => Iterator(msgToSrc, msgToDst) + case EdgeDirection.In => Iterator(msgToDst) + case EdgeDirection.Out => Iterator(msgToSrc) + case EdgeDirection.Both => + throw new SparkException("collectNeighbors does not support EdgeDirection.Both. Use" + + "EdgeDirection.Either instead.") + } + }, + (a, b) => a ++ b) graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(VertexID, VD)]) @@ -237,7 +246,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { def pregel[A: ClassTag]( initialMsg: A, maxIterations: Int = Int.MaxValue, - activeDirection: EdgeDirection = EdgeDirection.Out)( + activeDirection: EdgeDirection = EdgeDirection.Either)( vprog: (VertexID, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], mergeMsg: (A, A) => A) @@ -271,8 +280,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * * @see [[org.apache.spark.graphx.lib.ConnectedComponents]] */ - def connectedComponents(undirected: Boolean = true): Graph[VertexID, ED] = { - ConnectedComponents.run(graph, undirected) + def connectedComponents(): Graph[VertexID, ED] = { + ConnectedComponents.run(graph) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 83e28d0ab2..75b44ddac9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -67,7 +67,10 @@ object Pregel { * * @param activeDirection the direction of edges incident to a vertex that received a message in * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only - * out-edges of vertices that received a message in the previous round will run. + * out-edges of vertices that received a message in the previous round will run. The default is + * `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message + * in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where + * *both* vertices received a message. * * @param vprog the user-defined vertex program which runs on each * vertex and receives the inbound message and computes a new vertex @@ -90,7 +93,7 @@ object Pregel { */ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, - activeDirection: EdgeDirection = EdgeDirection.Out)( + activeDirection: EdgeDirection = EdgeDirection.Either)( vprog: (VertexID, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], mergeMsg: (A, A) => A) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 6a2abc71cc..c21f8935d9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -275,6 +275,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } else { edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) } + case Some(EdgeDirection.Either) => + // TODO: Because we only have a clustered index on the source vertex ID, we can't filter + // the index here. Instead we have to scan all edges and then do the filter. + edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) case Some(EdgeDirection.Out) => if (activeFraction < 0.8) { edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index d078d2acdb..d057c933d7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -19,37 +19,22 @@ object ConnectedComponents { * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], undirected: Boolean = true): + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = { val ccGraph = graph.mapVertices { case (vid, _) => vid } - if (undirected) { - def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { - if (edge.srcAttr < edge.dstAttr) { - Iterator((edge.dstId, edge.srcAttr)) - } else if (edge.srcAttr > edge.dstAttr) { - Iterator((edge.srcId, edge.dstAttr)) - } else { - Iterator.empty - } + def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + if (edge.srcAttr < edge.dstAttr) { + Iterator((edge.dstId, edge.srcAttr)) + } else if (edge.srcAttr > edge.dstAttr) { + Iterator((edge.srcId, edge.dstAttr)) + } else { + Iterator.empty } - val initialMessage = Long.MaxValue - Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Both)( - vprog = (id, attr, msg) => math.min(attr, msg), - sendMsg = sendMessage, - mergeMsg = (a, b) => math.min(a, b)) - } else { - def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { - if (edge.srcAttr < edge.dstAttr) { - Iterator((edge.dstId, edge.srcAttr)) - } else { - Iterator.empty - } - } - val initialMessage = Long.MaxValue - Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Out)( - vprog = (id, attr, msg) => math.min(attr, msg), - sendMsg = sendMessage, - mergeMsg = (a, b) => math.min(a, b)) } + val initialMessage = Long.MaxValue + Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( + vprog = (id, attr, msg) => math.min(attr, msg), + sendMsg = sendMessage, + mergeMsg = (a, b) => math.min(a, b)) } // end of connectedComponents } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index cf95267e77..6ced2462eb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -77,7 +77,7 @@ object PageRank extends Logging { val initialMessage = 0.0 // Execute pregel for a fixed number of iterations. - Pregel(pagerankGraph, initialMessage, numIter)( + Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out)( vertexProgram, sendMessage, messageCombiner) } @@ -153,7 +153,8 @@ object PageRank extends Logging { val initialMessage = resetProb / (1.0 - resetProb) // Execute a dynamic version of Pregel. - Pregel(pagerankGraph, initialMessage)(vertexProgram, sendMessage, messageCombiner) + Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)( + vertexProgram, sendMessage, messageCombiner) .mapVertices((vid, attr) => attr._1) } // end of deltaPageRank diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala index 43c4b9cf2d..edffbcc5ac 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala @@ -53,7 +53,8 @@ object StronglyConnectedComponents { // collect min of all my neighbor's scc values, update if it's smaller than mine // then notify any neighbors with scc values larger than mine - sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Long.MaxValue)( + sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID]( + sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)( (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2), e => { if (e.srcId < e.dstId) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index 58da9e3aed..d3e22b176c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -28,7 +28,7 @@ object TriangleCount { // Construct set representations of the neighborhoods val nbrSets: VertexRDD[VertexSet] = - g.collectNeighborIds(EdgeDirection.Both).mapValues { (vid, nbrs) => + g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => val set = new VertexSet(4) var i = 0 while (i < nbrs.size) { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index 7a901409d5..280f50e39a 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -28,7 +28,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { val chain = (0 until 100).map(x => (x, (x+1)%100) ) val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) } val graph = Graph.fromEdgeTuples(rawEdges, 1.0).cache() - val nbrs = graph.collectNeighborIds(EdgeDirection.Both).cache() + val nbrs = graph.collectNeighborIds(EdgeDirection.Either).cache() assert(nbrs.count === chain.size) assert(graph.numVertices === nbrs.count) nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala index 1ff3d75633..bceff11b8e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala @@ -32,7 +32,7 @@ class PregelSuite extends FunSuite with LocalSparkContext { Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet) val result = Pregel(chainWithSeed, 0)( (vid, attr, msg) => math.max(msg, attr), - et => Iterator((et.dstId, et.srcAttr)), + et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty, (a: Int, b: Int) => math.max(a, b)) assert(result.vertices.collect.toSet === chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index 86da8f1b46..27c8705bca 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -102,7 +102,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) - val ccGraph = graph.connectedComponents(undirected = true) + val ccGraph = graph.connectedComponents() val vertices = ccGraph.vertices.collect for ( (id, cc) <- vertices ) { assert(cc == 0) -- cgit v1.2.3 From cfe4a29dcb516ceae5f243ac3b5d0c3a505d7f5a Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 13 Jan 2014 17:15:21 -0800 Subject: Improvements in example code for the programming guide as well as adding serialization support for GraphImpl to address issues with failed closure capture. --- docs/graphx-programming-guide.md | 39 ++++++++++++---------- .../org/apache/spark/graphx/impl/GraphImpl.scala | 3 ++ 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index e6afd092be..c82c3d7358 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -478,24 +478,26 @@ def mapReduceTriplets[A]( The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which is applied to each triplet and can yield *messages* destined to either (none or both) vertices in -the triplet. We currently only support messages destined to the source or destination vertex of the -triplet to enable optimized preaggregation. The user defined `reduce` function combines the +the triplet. To facilitate optimized pre-aggregation, we currently only support messages destined +to the source or destination vertex of the triplet. The user defined `reduce` function combines the messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]` -containing the aggregate message to each vertex. Vertices that do not receive a message are not -included in the returned `VertexRDD`. +containing the aggregate message (of type `A`) destined to each vertex. Vertices that do not +receive a message are not included in the returned `VertexRDD`. -> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which +> Note that `mapReduceTriplets` takes an additional optional `activeSet` (see API docs) which > restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting > computation to triplets adjacent to a subset of the vertices is often necessary in incremental > iterative computation and is a key part of the GraphX implementation of Pregel. -We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For -example if we wanted to compute the average age of followers who are older that each user we could -do the following. +In the following example we use the `mapReduceTriplets` operator to compute the average age of the +more senior followers of each user. {% highlight scala %} -// Graph with age as the vertex property -val graph: Graph[Double, String] = getFromSomewhereElse() +// Import Random graph generation library +import org.apache.spark.graphx.util.GraphGenerators +// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity. +val graph: Graph[Double, Int] = + GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble ) // Compute the number of older followers and their total age val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( triplet => { // Map Function @@ -511,13 +513,16 @@ val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Dou (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function ) // Divide total age by number of older followers to get average age of older followers -val avgAgeOlderFollowers: VertexRDD[Double] = - olderFollowers.mapValues { case (count, totalAge) => totalAge / count } +val avgAgeOfOlderFollowers: VertexRDD[Double] = + olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ) +// Display the results +avgAgeOfOlderFollowers.collect.foreach(println(_)) {% endhighlight %} > Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums) > are constant sized (e.g., floats and addition instead of lists and concatenation). More -> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex. +> precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each +> vertex. ### Computing Degree Information @@ -529,13 +534,13 @@ compute the max in, out, and total degrees: {% highlight scala %} // Define a reduce operation to compute the highest degree vertex -def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { +def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = { if (a._2 > b._2) a else b } // Compute the max degrees -val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce) -val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce) -val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce) +val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max) +val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max) +val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max) {% endhighlight %} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index c21f8935d9..916eb9763c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -32,6 +32,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( @transient val replicatedVertexView: ReplicatedVertexView[VD]) extends Graph[VD, ED] with Serializable { + /** Default construct is provided to support serialization */ + protected def this() = this(null, null, null, null) + /** Return a RDD that brings edges together with their source and destination vertices. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { val vdTag = classTag[VD] -- cgit v1.2.3