aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-13 16:15:10 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-13 17:03:03 -0800
commit1bd5cefcae2769d48ad5ef4b8197193371c754da (patch)
tree8248ab428af3a76b89f4cfbe526a839a31284d81 /graphx
parente2d25d2dfeb1d43d1e36f169250d8efef4ac232a (diff)
downloadspark-1bd5cefcae2769d48ad5ef4b8197193371c754da.tar.gz
spark-1bd5cefcae2769d48ad5ef4b8197193371c754da.tar.bz2
spark-1bd5cefcae2769d48ad5ef4b8197193371c754da.zip
Remove aggregateNeighbors
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala64
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala26
2 files changed, 5 insertions, 85 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index a0a40e2d9a..578eb331c1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -56,60 +56,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
}
/**
- * Computes a statistic for the neighborhood of each vertex.
- *
- * @param mapFunc the function applied to each edge adjacent to each vertex. The mapFunc can
- * optionally return `None`, in which case it does not contribute to the final sum.
- * @param reduceFunc the function used to merge the results of each map operation
- * @param direction the direction of edges to consider (e.g., In, Out, Both).
- * @tparam A the aggregation type
- *
- * @return an RDD containing tuples of vertex identifiers and
- * their resulting value. Vertices with no neighbors will not appear in the RDD.
- *
- * @example We can use this function to compute the average follower
- * age for each user:
- *
- * {{{
- * val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph")
- * val averageFollowerAge: RDD[(Int, Int)] =
- * graph.aggregateNeighbors[(Int,Double)](
- * (vid, edge) => Some((edge.otherVertex(vid).data, 1)),
- * (a, b) => (a._1 + b._1, a._2 + b._2),
- * -1,
- * EdgeDirection.In)
- * .mapValues{ case (sum,followers) => sum.toDouble / followers}
- * }}}
- */
- def aggregateNeighbors[A: ClassTag](
- mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
- reduceFunc: (A, A) => A,
- dir: EdgeDirection)
- : VertexRDD[A] = {
- // Define a new map function over edge triplets
- val mf = (et: EdgeTriplet[VD,ED]) => {
- // Compute the message to the dst vertex
- val dst =
- if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
- mapFunc(et.dstId, et)
- } else { Option.empty[A] }
- // Compute the message to the source vertex
- val src =
- if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
- mapFunc(et.srcId, et)
- } else { Option.empty[A] }
- // construct the return array
- (src, dst) match {
- case (None, None) => Iterator.empty
- case (Some(srcA),None) => Iterator((et.srcId, srcA))
- case (None, Some(dstA)) => Iterator((et.dstId, dstA))
- case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA))
- }
- }
- graph.mapReduceTriplets(mf, reduceFunc)
- } // end of aggregateNeighbors
-
- /**
* Collect the neighbor vertex ids for each vertex.
*
* @param edgeDirection the direction along which to collect
@@ -152,11 +98,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
*
* @return the vertex set of neighboring vertex attributes for each vertex
*/
- def collectNeighbors(edgeDirection: EdgeDirection) :
- VertexRDD[ Array[(VertexID, VD)] ] = {
- val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]](
- (vid, edge) =>
- Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
+ def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = {
+ val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]](
+ edge => Iterator(
+ (edge.srcId, Array((edge.dstId, edge.dstAttr))),
+ (edge.dstId, Array((edge.srcId, edge.srcAttr)))),
(a, b) => a ++ b,
edgeDirection)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index cd3c0bbd30..7a901409d5 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -8,32 +8,6 @@ import org.scalatest.FunSuite
class GraphOpsSuite extends FunSuite with LocalSparkContext {
- test("aggregateNeighbors") {
- withSpark { sc =>
- val n = 3
- val star =
- Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1)
-
- val indegrees = star.aggregateNeighbors(
- (vid, edge) => Some(1),
- (a: Int, b: Int) => a + b,
- EdgeDirection.In)
- assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet)
-
- val outdegrees = star.aggregateNeighbors(
- (vid, edge) => Some(1),
- (a: Int, b: Int) => a + b,
- EdgeDirection.Out)
- assert(outdegrees.collect().toSet === Set((0, n)))
-
- val noVertexValues = star.aggregateNeighbors[Int](
- (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None,
- (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
- EdgeDirection.In)
- assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)])
- }
- }
-
test("joinVertices") {
withSpark { sc =>
val vertices =