aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-02-15 09:20:49 +0000
committerSean Owen <sowen@cloudera.com>2016-02-15 09:20:49 +0000
commit56d49397e01306637edf23bbb4f3b9d396cdc6ff (patch)
treed1c91e37e2f1903a4d0e75b81058eebdb804178a /graphx
parenta8bbc4f50ef3faacf4b7fe865a29144ea87f0796 (diff)
downloadspark-56d49397e01306637edf23bbb4f3b9d396cdc6ff.tar.gz
spark-56d49397e01306637edf23bbb4f3b9d396cdc6ff.tar.bz2
spark-56d49397e01306637edf23bbb4f3b9d396cdc6ff.zip
[SPARK-12995][GRAPHX] Remove deprecate APIs from Pregel
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #10918 from maropu/RemoveDeprecateInPregel.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala49
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala27
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala25
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala11
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala52
6 files changed, 36 insertions, 134 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 869caa340f..fe884d0022 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -341,55 +341,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
/**
- * Aggregates values from the neighboring edges and vertices of each vertex. The user supplied
- * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
- * "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
- * the map phase destined to each vertex.
- *
- * This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead.
- *
- * @tparam A the type of "message" to be sent to each vertex
- *
- * @param mapFunc the user defined map function which returns 0 or
- * more messages to neighboring vertices
- *
- * @param reduceFunc the user defined reduce function which should
- * be commutative and associative and is used to combine the output
- * of the map phase
- *
- * @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
- * desired. This is done by specifying a set of "active" vertices and an edge direction. The
- * `sendMsg` function will then run only on edges connected to active vertices by edges in the
- * specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
- * destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
- * originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
- * run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
- * will be run on edges with *both* vertices in the active set. The active set must have the
- * same index as the graph's vertices.
- *
- * @example We can use this function to compute the in-degree of each
- * vertex
- * {{{
- * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
- * val inDeg: RDD[(VertexId, Int)] =
- * mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
- * }}}
- *
- * @note By expressing computation at the edge level we achieve
- * maximum parallelism. This is one of the core functions in the
- * Graph API in that enables neighborhood level computation. For
- * example this function can be used to count neighbors satisfying a
- * predicate or implement PageRank.
- *
- */
- @deprecated("use aggregateMessages", "1.2.0")
- def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
- reduceFunc: (A, A) => A,
- activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
- : VertexRDD[A]
-
- /**
* Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
* `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
* sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
index 8ec33e1400..ef0b943fc3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.graphx
+import scala.reflect.ClassTag
+
import org.apache.spark.SparkConf
import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
@@ -24,6 +26,7 @@ import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.{BitSet, OpenHashSet}
object GraphXUtils {
+
/**
* Registers classes that GraphX uses with Kryo.
*/
@@ -42,4 +45,28 @@ object GraphXUtils {
classOf[OpenHashSet[Int]],
classOf[OpenHashSet[Long]]))
}
+
+ /**
+ * A proxy method to map the obsolete API to the new one.
+ */
+ private[graphx] def mapReduceTriplets[VD: ClassTag, ED: ClassTag, A: ClassTag](
+ g: Graph[VD, ED],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
+ reduceFunc: (A, A) => A,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
+ def sendMsg(ctx: EdgeContext[VD, ED, A]) {
+ mapFunc(ctx.toEdgeTriplet).foreach { kv =>
+ val id = kv._1
+ val msg = kv._2
+ if (id == ctx.srcId) {
+ ctx.sendToSrc(msg)
+ } else {
+ assert(id == ctx.dstId)
+ ctx.sendToDst(msg)
+ }
+ }
+ }
+ g.aggregateMessagesWithActiveSet(
+ sendMsg, reduceFunc, TripletFields.All, activeSetOpt)
+ }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 796082721d..3ba73b4c96 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -121,7 +121,7 @@ object Pregel extends Logging {
{
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
- var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
+ var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop
var prevG: Graph[VD, ED] = null
@@ -135,8 +135,8 @@ object Pregel extends Logging {
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
- messages = g.mapReduceTriplets(
- sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
+ messages = GraphXUtils.mapReduceTriplets(
+ g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 81182adbc6..c5cb533b13 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -187,31 +187,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
// Lower level transformation methods
// ///////////////////////////////////////////////////////////////////////////////////////////////
- override def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
- reduceFunc: (A, A) => A,
- activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A] = {
-
- def sendMsg(ctx: EdgeContext[VD, ED, A]) {
- mapFunc(ctx.toEdgeTriplet).foreach { kv =>
- val id = kv._1
- val msg = kv._2
- if (id == ctx.srcId) {
- ctx.sendToSrc(msg)
- } else {
- assert(id == ctx.dstId)
- ctx.sendToDst(msg)
- }
- }
- }
-
- val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
- val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
- val tripletFields = new TripletFields(mapUsesSrcAttr, mapUsesDstAttr, true)
-
- aggregateMessagesWithActiveSet(sendMsg, reduceFunc, tripletFields, activeSetOpt)
- }
-
override def aggregateMessagesWithActiveSet[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 16300e0740..78a5cb057d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -40,17 +40,6 @@ object SVDPlusPlus {
extends Serializable
/**
- * This method is now replaced by the updated version of `run()` and returns exactly
- * the same result.
- */
- @deprecated("Call run()", "1.4.0")
- def runSVDPlusPlus(edges: RDD[Edge[Double]], conf: Conf)
- : (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) =
- {
- run(edges, conf)
- }
-
- /**
* Implement SVD++ based on "Factorization Meets the Neighborhood:
* a Multifaceted Collaborative Filtering Model",
* available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 2fbc6f069d..f497e001df 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -221,7 +221,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
- val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
+ val result = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+ graph, et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(result.collect().toSet === Set((1L, 2)))
}
}
@@ -281,49 +282,6 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
}
}
- test("mapReduceTriplets") {
- withSpark { sc =>
- val n = 5
- val star = starGraph(sc, n).mapVertices { (_, _) => 0 }.cache()
- val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
- val neighborDegreeSums = starDeg.mapReduceTriplets(
- edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
- (a: Int, b: Int) => a + b)
- assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
-
- // activeSetOpt
- val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
- val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
- val vids = complete.mapVertices((vid, attr) => vid).cache()
- val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
- val numEvenNeighbors = vids.mapReduceTriplets(et => {
- // Map function should only run on edges with destination in the active set
- if (et.dstId % 2 != 0) {
- throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
- }
- Iterator((et.srcId, 1))
- }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
- assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
-
- // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
- val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
- val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
- val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
- val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
- newOpt.getOrElse(old)
- }
- val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
- // Map function should only run on edges with source in the active set
- if (et.srcId % 2 != 1) {
- throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
- }
- Iterator((et.dstId, 1))
- }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
- assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
-
- }
- }
-
test("aggregateMessages") {
withSpark { sc =>
val n = 5
@@ -347,7 +305,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
(vid, a, bOpt) => bOpt.getOrElse(0)
}
- val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
+ val neighborDegreeSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+ reverseStarDegrees,
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect().toSet
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
@@ -420,7 +379,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
numEdgePartitions)
val graph = Graph.fromEdgeTuples(edges, 1)
- val neighborAttrSums = graph.mapReduceTriplets[Int](
+ val neighborAttrSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+ graph,
et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
} finally {