aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'graphx/src/main/scala/org/apache')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala49
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala27
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala25
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala11
5 files changed, 30 insertions, 88 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 869caa340f..fe884d0022 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -341,55 +341,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
/**
- * Aggregates values from the neighboring edges and vertices of each vertex. The user supplied
- * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
- * "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
- * the map phase destined to each vertex.
- *
- * This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead.
- *
- * @tparam A the type of "message" to be sent to each vertex
- *
- * @param mapFunc the user defined map function which returns 0 or
- * more messages to neighboring vertices
- *
- * @param reduceFunc the user defined reduce function which should
- * be commutative and associative and is used to combine the output
- * of the map phase
- *
- * @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
- * desired. This is done by specifying a set of "active" vertices and an edge direction. The
- * `sendMsg` function will then run only on edges connected to active vertices by edges in the
- * specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
- * destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
- * originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
- * run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
- * will be run on edges with *both* vertices in the active set. The active set must have the
- * same index as the graph's vertices.
- *
- * @example We can use this function to compute the in-degree of each
- * vertex
- * {{{
- * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
- * val inDeg: RDD[(VertexId, Int)] =
- * mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
- * }}}
- *
- * @note By expressing computation at the edge level we achieve
- * maximum parallelism. This is one of the core functions in the
- * Graph API in that enables neighborhood level computation. For
- * example this function can be used to count neighbors satisfying a
- * predicate or implement PageRank.
- *
- */
- @deprecated("use aggregateMessages", "1.2.0")
- def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
- reduceFunc: (A, A) => A,
- activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
- : VertexRDD[A]
-
- /**
* Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
* `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
* sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
index 8ec33e1400..ef0b943fc3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.graphx
+import scala.reflect.ClassTag
+
import org.apache.spark.SparkConf
import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
@@ -24,6 +26,7 @@ import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.{BitSet, OpenHashSet}
object GraphXUtils {
+
/**
* Registers classes that GraphX uses with Kryo.
*/
@@ -42,4 +45,28 @@ object GraphXUtils {
classOf[OpenHashSet[Int]],
classOf[OpenHashSet[Long]]))
}
+
+ /**
+ * A proxy method to map the obsolete API to the new one.
+ */
+ private[graphx] def mapReduceTriplets[VD: ClassTag, ED: ClassTag, A: ClassTag](
+ g: Graph[VD, ED],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
+ reduceFunc: (A, A) => A,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
+ def sendMsg(ctx: EdgeContext[VD, ED, A]) {
+ mapFunc(ctx.toEdgeTriplet).foreach { kv =>
+ val id = kv._1
+ val msg = kv._2
+ if (id == ctx.srcId) {
+ ctx.sendToSrc(msg)
+ } else {
+ assert(id == ctx.dstId)
+ ctx.sendToDst(msg)
+ }
+ }
+ }
+ g.aggregateMessagesWithActiveSet(
+ sendMsg, reduceFunc, TripletFields.All, activeSetOpt)
+ }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 796082721d..3ba73b4c96 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -121,7 +121,7 @@ object Pregel extends Logging {
{
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
- var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
+ var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop
var prevG: Graph[VD, ED] = null
@@ -135,8 +135,8 @@ object Pregel extends Logging {
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
- messages = g.mapReduceTriplets(
- sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
+ messages = GraphXUtils.mapReduceTriplets(
+ g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 81182adbc6..c5cb533b13 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -187,31 +187,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
// Lower level transformation methods
// ///////////////////////////////////////////////////////////////////////////////////////////////
- override def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
- reduceFunc: (A, A) => A,
- activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A] = {
-
- def sendMsg(ctx: EdgeContext[VD, ED, A]) {
- mapFunc(ctx.toEdgeTriplet).foreach { kv =>
- val id = kv._1
- val msg = kv._2
- if (id == ctx.srcId) {
- ctx.sendToSrc(msg)
- } else {
- assert(id == ctx.dstId)
- ctx.sendToDst(msg)
- }
- }
- }
-
- val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
- val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
- val tripletFields = new TripletFields(mapUsesSrcAttr, mapUsesDstAttr, true)
-
- aggregateMessagesWithActiveSet(sendMsg, reduceFunc, tripletFields, activeSetOpt)
- }
-
override def aggregateMessagesWithActiveSet[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 16300e0740..78a5cb057d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -40,17 +40,6 @@ object SVDPlusPlus {
extends Serializable
/**
- * This method is now replaced by the updated version of `run()` and returns exactly
- * the same result.
- */
- @deprecated("Call run()", "1.4.0")
- def runSVDPlusPlus(edges: RDD[Edge[Double]], conf: Conf)
- : (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) =
- {
- run(edges, conf)
- }
-
- /**
* Implement SVD++ based on "Factorization Meets the Neighborhood:
* a Multifaceted Collaborative Filtering Model",
* available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].