From 1bd5cefcae2769d48ad5ef4b8197193371c754da Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 13 Jan 2014 16:15:10 -0800 Subject: Remove aggregateNeighbors --- .../scala/org/apache/spark/graphx/GraphOps.scala | 64 ++-------------------- .../org/apache/spark/graphx/GraphOpsSuite.scala | 26 --------- 2 files changed, 5 insertions(+), 85 deletions(-) (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index a0a40e2d9a..578eb331c1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -55,60 +55,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { } } - /** - * Computes a statistic for the neighborhood of each vertex. - * - * @param mapFunc the function applied to each edge adjacent to each vertex. The mapFunc can - * optionally return `None`, in which case it does not contribute to the final sum. - * @param reduceFunc the function used to merge the results of each map operation - * @param direction the direction of edges to consider (e.g., In, Out, Both). - * @tparam A the aggregation type - * - * @return an RDD containing tuples of vertex identifiers and - * their resulting value. Vertices with no neighbors will not appear in the RDD. - * - * @example We can use this function to compute the average follower - * age for each user: - * - * {{{ - * val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph") - * val averageFollowerAge: RDD[(Int, Int)] = - * graph.aggregateNeighbors[(Int,Double)]( - * (vid, edge) => Some((edge.otherVertex(vid).data, 1)), - * (a, b) => (a._1 + b._1, a._2 + b._2), - * -1, - * EdgeDirection.In) - * .mapValues{ case (sum,followers) => sum.toDouble / followers} - * }}} - */ - def aggregateNeighbors[A: ClassTag]( - mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A], - reduceFunc: (A, A) => A, - dir: EdgeDirection) - : VertexRDD[A] = { - // Define a new map function over edge triplets - val mf = (et: EdgeTriplet[VD,ED]) => { - // Compute the message to the dst vertex - val dst = - if (dir == EdgeDirection.In || dir == EdgeDirection.Both) { - mapFunc(et.dstId, et) - } else { Option.empty[A] } - // Compute the message to the source vertex - val src = - if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) { - mapFunc(et.srcId, et) - } else { Option.empty[A] } - // construct the return array - (src, dst) match { - case (None, None) => Iterator.empty - case (Some(srcA),None) => Iterator((et.srcId, srcA)) - case (None, Some(dstA)) => Iterator((et.dstId, dstA)) - case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA)) - } - } - graph.mapReduceTriplets(mf, reduceFunc) - } // end of aggregateNeighbors - /** * Collect the neighbor vertex ids for each vertex. * @@ -152,11 +98,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * * @return the vertex set of neighboring vertex attributes for each vertex */ - def collectNeighbors(edgeDirection: EdgeDirection) : - VertexRDD[ Array[(VertexID, VD)] ] = { - val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]]( - (vid, edge) => - Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )), + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = { + val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]]( + edge => Iterator( + (edge.srcId, Array((edge.dstId, edge.dstAttr))), + (edge.dstId, Array((edge.srcId, edge.srcAttr)))), (a, b) => a ++ b, edgeDirection) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index cd3c0bbd30..7a901409d5 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -8,32 +8,6 @@ import org.scalatest.FunSuite class GraphOpsSuite extends FunSuite with LocalSparkContext { - test("aggregateNeighbors") { - withSpark { sc => - val n = 3 - val star = - Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1) - - val indegrees = star.aggregateNeighbors( - (vid, edge) => Some(1), - (a: Int, b: Int) => a + b, - EdgeDirection.In) - assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet) - - val outdegrees = star.aggregateNeighbors( - (vid, edge) => Some(1), - (a: Int, b: Int) => a + b, - EdgeDirection.Out) - assert(outdegrees.collect().toSet === Set((0, n))) - - val noVertexValues = star.aggregateNeighbors[Int]( - (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None, - (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"), - EdgeDirection.In) - assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)]) - } - } - test("joinVertices") { withSpark { sc => val vertices = -- cgit v1.2.3