From 56d49397e01306637edf23bbb4f3b9d396cdc6ff Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Mon, 15 Feb 2016 09:20:49 +0000 Subject: [SPARK-12995][GRAPHX] Remove deprecate APIs from Pregel Author: Takeshi YAMAMURO Closes #10918 from maropu/RemoveDeprecateInPregel. --- .../main/scala/org/apache/spark/graphx/Graph.scala | 49 -------------------- .../org/apache/spark/graphx/GraphXUtils.scala | 27 +++++++++++ .../scala/org/apache/spark/graphx/Pregel.scala | 6 +-- .../org/apache/spark/graphx/impl/GraphImpl.scala | 25 ----------- .../org/apache/spark/graphx/lib/SVDPlusPlus.scala | 11 ----- .../scala/org/apache/spark/graphx/GraphSuite.scala | 52 +++------------------- 6 files changed, 36 insertions(+), 134 deletions(-) (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 869caa340f..fe884d0022 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -340,55 +340,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] - /** - * Aggregates values from the neighboring edges and vertices of each vertex. The user supplied - * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be - * "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of - * the map phase destined to each vertex. - * - * This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead. - * - * @tparam A the type of "message" to be sent to each vertex - * - * @param mapFunc the user defined map function which returns 0 or - * more messages to neighboring vertices - * - * @param reduceFunc the user defined reduce function which should - * be commutative and associative and is used to combine the output - * of the map phase - * - * @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if - * desired. This is done by specifying a set of "active" vertices and an edge direction. The - * `sendMsg` function will then run only on edges connected to active vertices by edges in the - * specified direction. If the direction is `In`, `sendMsg` will only be run on edges with - * destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges - * originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be - * run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg` - * will be run on edges with *both* vertices in the active set. The active set must have the - * same index as the graph's vertices. - * - * @example We can use this function to compute the in-degree of each - * vertex - * {{{ - * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") - * val inDeg: RDD[(VertexId, Int)] = - * mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _) - * }}} - * - * @note By expressing computation at the edge level we achieve - * maximum parallelism. This is one of the core functions in the - * Graph API in that enables neighborhood level computation. For - * example this function can be used to count neighbors satisfying a - * predicate or implement PageRank. - * - */ - @deprecated("use aggregateMessages", "1.2.0") - def mapReduceTriplets[A: ClassTag]( - mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], - reduceFunc: (A, A) => A, - activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) - : VertexRDD[A] - /** * Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied * `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala index 8ec33e1400..ef0b943fc3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx +import scala.reflect.ClassTag + import org.apache.spark.SparkConf import org.apache.spark.graphx.impl._ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap @@ -24,6 +26,7 @@ import org.apache.spark.util.BoundedPriorityQueue import org.apache.spark.util.collection.{BitSet, OpenHashSet} object GraphXUtils { + /** * Registers classes that GraphX uses with Kryo. */ @@ -42,4 +45,28 @@ object GraphXUtils { classOf[OpenHashSet[Int]], classOf[OpenHashSet[Long]])) } + + /** + * A proxy method to map the obsolete API to the new one. + */ + private[graphx] def mapReduceTriplets[VD: ClassTag, ED: ClassTag, A: ClassTag]( + g: Graph[VD, ED], + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], + reduceFunc: (A, A) => A, + activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = { + def sendMsg(ctx: EdgeContext[VD, ED, A]) { + mapFunc(ctx.toEdgeTriplet).foreach { kv => + val id = kv._1 + val msg = kv._2 + if (id == ctx.srcId) { + ctx.sendToSrc(msg) + } else { + assert(id == ctx.dstId) + ctx.sendToDst(msg) + } + } + } + g.aggregateMessagesWithActiveSet( + sendMsg, reduceFunc, TripletFields.All, activeSetOpt) + } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 796082721d..3ba73b4c96 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -121,7 +121,7 @@ object Pregel extends Logging { { var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() // compute the messages - var messages = g.mapReduceTriplets(sendMsg, mergeMsg) + var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) var activeMessages = messages.count() // Loop var prevG: Graph[VD, ED] = null @@ -135,8 +135,8 @@ object Pregel extends Logging { // Send new messages, skipping edges where neither side received a message. We must cache // messages so it can be materialized on the next line, allowing us to uncache the previous // iteration. - messages = g.mapReduceTriplets( - sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() + messages = GraphXUtils.mapReduceTriplets( + g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages // and the vertices of g). diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 81182adbc6..c5cb533b13 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -187,31 +187,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( // Lower level transformation methods // /////////////////////////////////////////////////////////////////////////////////////////////// - override def mapReduceTriplets[A: ClassTag]( - mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], - reduceFunc: (A, A) => A, - activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A] = { - - def sendMsg(ctx: EdgeContext[VD, ED, A]) { - mapFunc(ctx.toEdgeTriplet).foreach { kv => - val id = kv._1 - val msg = kv._2 - if (id == ctx.srcId) { - ctx.sendToSrc(msg) - } else { - assert(id == ctx.dstId) - ctx.sendToDst(msg) - } - } - } - - val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr") - val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr") - val tripletFields = new TripletFields(mapUsesSrcAttr, mapUsesDstAttr, true) - - aggregateMessagesWithActiveSet(sendMsg, reduceFunc, tripletFields, activeSetOpt) - } - override def aggregateMessagesWithActiveSet[A: ClassTag]( sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index 16300e0740..78a5cb057d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -39,17 +39,6 @@ object SVDPlusPlus { var gamma7: Double) extends Serializable - /** - * This method is now replaced by the updated version of `run()` and returns exactly - * the same result. - */ - @deprecated("Call run()", "1.4.0") - def runSVDPlusPlus(edges: RDD[Edge[Double]], conf: Conf) - : (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) = - { - run(edges, conf) - } - /** * Implement SVD++ based on "Factorization Meets the Neighborhood: * a Multifaceted Collaborative Filtering Model", diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 2fbc6f069d..f497e001df 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -221,7 +221,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2))) val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0))) val graph = Graph(vertices, edges).reverse - val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _) + val result = GraphXUtils.mapReduceTriplets[Int, Int, Int]( + graph, et => Iterator((et.dstId, et.srcAttr)), _ + _) assert(result.collect().toSet === Set((1L, 2))) } } @@ -281,49 +282,6 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { } } - test("mapReduceTriplets") { - withSpark { sc => - val n = 5 - val star = starGraph(sc, n).mapVertices { (_, _) => 0 }.cache() - val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg } - val neighborDegreeSums = starDeg.mapReduceTriplets( - edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)), - (a: Int, b: Int) => a + b) - assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) - - // activeSetOpt - val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId) - val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) - val vids = complete.mapVertices((vid, attr) => vid).cache() - val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } - val numEvenNeighbors = vids.mapReduceTriplets(et => { - // Map function should only run on edges with destination in the active set - if (et.dstId % 2 != 0) { - throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId)) - } - Iterator((et.srcId, 1)) - }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet - assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet) - - // outerJoinVertices followed by mapReduceTriplets(activeSetOpt) - val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3) - val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache() - val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache() - val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => - newOpt.getOrElse(old) - } - val numOddNeighbors = changedGraph.mapReduceTriplets(et => { - // Map function should only run on edges with source in the active set - if (et.srcId % 2 != 1) { - throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId)) - } - Iterator((et.dstId, 1)) - }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet - assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet) - - } - } - test("aggregateMessages") { withSpark { sc => val n = 5 @@ -347,7 +305,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) } - val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( + val neighborDegreeSums = GraphXUtils.mapReduceTriplets[Int, Int, Int]( + reverseStarDegrees, et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), (a: Int, b: Int) => a + b).collect().toSet assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0))) @@ -420,7 +379,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)), numEdgePartitions) val graph = Graph.fromEdgeTuples(edges, 1) - val neighborAttrSums = graph.mapReduceTriplets[Int]( + val neighborAttrSums = GraphXUtils.mapReduceTriplets[Int, Int, Int]( + graph, et => Iterator((et.dstId, et.srcAttr)), _ + _) assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n))) } finally { -- cgit v1.2.3