diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-13 16:15:10 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-13 17:03:03 -0800 |
commit | 1bd5cefcae2769d48ad5ef4b8197193371c754da (patch) | |
tree | 8248ab428af3a76b89f4cfbe526a839a31284d81 /graphx | |
parent | e2d25d2dfeb1d43d1e36f169250d8efef4ac232a (diff) | |
download | spark-1bd5cefcae2769d48ad5ef4b8197193371c754da.tar.gz spark-1bd5cefcae2769d48ad5ef4b8197193371c754da.tar.bz2 spark-1bd5cefcae2769d48ad5ef4b8197193371c754da.zip |
Remove aggregateNeighbors
Diffstat (limited to 'graphx')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 64 | ||||
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala | 26 |
2 files changed, 5 insertions, 85 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index a0a40e2d9a..578eb331c1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -56,60 +56,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { } /** - * Computes a statistic for the neighborhood of each vertex. - * - * @param mapFunc the function applied to each edge adjacent to each vertex. The mapFunc can - * optionally return `None`, in which case it does not contribute to the final sum. - * @param reduceFunc the function used to merge the results of each map operation - * @param direction the direction of edges to consider (e.g., In, Out, Both). - * @tparam A the aggregation type - * - * @return an RDD containing tuples of vertex identifiers and - * their resulting value. Vertices with no neighbors will not appear in the RDD. - * - * @example We can use this function to compute the average follower - * age for each user: - * - * {{{ - * val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph") - * val averageFollowerAge: RDD[(Int, Int)] = - * graph.aggregateNeighbors[(Int,Double)]( - * (vid, edge) => Some((edge.otherVertex(vid).data, 1)), - * (a, b) => (a._1 + b._1, a._2 + b._2), - * -1, - * EdgeDirection.In) - * .mapValues{ case (sum,followers) => sum.toDouble / followers} - * }}} - */ - def aggregateNeighbors[A: ClassTag]( - mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A], - reduceFunc: (A, A) => A, - dir: EdgeDirection) - : VertexRDD[A] = { - // Define a new map function over edge triplets - val mf = (et: EdgeTriplet[VD,ED]) => { - // Compute the message to the dst vertex - val dst = - if (dir == EdgeDirection.In || dir == EdgeDirection.Both) { - mapFunc(et.dstId, et) - } else { Option.empty[A] } - // Compute the message to the source vertex - val src = - if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) { - mapFunc(et.srcId, et) - } else { Option.empty[A] } - // construct the return array - (src, dst) match { - case (None, None) => Iterator.empty - case (Some(srcA),None) => Iterator((et.srcId, srcA)) - case (None, Some(dstA)) => Iterator((et.dstId, dstA)) - case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA)) - } - } - graph.mapReduceTriplets(mf, reduceFunc) - } // end of aggregateNeighbors - - /** * Collect the neighbor vertex ids for each vertex. * * @param edgeDirection the direction along which to collect @@ -152,11 +98,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * * @return the vertex set of neighboring vertex attributes for each vertex */ - def collectNeighbors(edgeDirection: EdgeDirection) : - VertexRDD[ Array[(VertexID, VD)] ] = { - val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]]( - (vid, edge) => - Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )), + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = { + val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]]( + edge => Iterator( + (edge.srcId, Array((edge.dstId, edge.dstAttr))), + (edge.dstId, Array((edge.srcId, edge.srcAttr)))), (a, b) => a ++ b, edgeDirection) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index cd3c0bbd30..7a901409d5 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -8,32 +8,6 @@ import org.scalatest.FunSuite class GraphOpsSuite extends FunSuite with LocalSparkContext { - test("aggregateNeighbors") { - withSpark { sc => - val n = 3 - val star = - Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1) - - val indegrees = star.aggregateNeighbors( - (vid, edge) => Some(1), - (a: Int, b: Int) => a + b, - EdgeDirection.In) - assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet) - - val outdegrees = star.aggregateNeighbors( - (vid, edge) => Some(1), - (a: Int, b: Int) => a + b, - EdgeDirection.Out) - assert(outdegrees.collect().toSet === Set((0, n))) - - val noVertexValues = star.aggregateNeighbors[Int]( - (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None, - (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"), - EdgeDirection.In) - assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)]) - } - } - test("joinVertices") { withSpark { sc => val vertices = |