diff options
4 files changed, 8 insertions, 33 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index d5150382d5..116d1ea700 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -129,15 +129,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))) ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))) }, - (a, b) => a ++ b, TripletFields.SrcDstOnly) + (a, b) => a ++ b, TripletFields.All) case EdgeDirection.In => graph.aggregateMessages[Array[(VertexId,VD)]]( ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))), - (a, b) => a ++ b, TripletFields.SrcOnly) + (a, b) => a ++ b, TripletFields.Src) case EdgeDirection.Out => graph.aggregateMessages[Array[(VertexId,VD)]]( ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))), - (a, b) => a ++ b, TripletFields.DstOnly) + (a, b) => a ++ b, TripletFields.Dst) case EdgeDirection.Both => throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" + "EdgeDirection.Either instead.") diff --git a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java index 8dfccfe2e2..7eb4ae0f44 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java +++ b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java @@ -56,39 +56,14 @@ public class TripletFields implements Serializable { public static final TripletFields EdgeOnly = new TripletFields(false, false, true); /** - * Expose only the source field and not the edge or destination field. - */ - public static final TripletFields SrcOnly = new TripletFields(true, false, false); - - /** - * Expose only the destination field and not the edge or source field. - */ - public static final TripletFields DstOnly = new TripletFields(false, true, false); - - /** - * Expose the source and destination fields but not the edge field. - */ - public static final TripletFields SrcDstOnly = new TripletFields(true, true, false); - - /** * Expose the source and edge fields but not the destination field. (Same as Src) */ - public static final TripletFields SrcAndEdge = new TripletFields(true, false, true); - - /** - * Expose the source and edge fields but not the destination field. (Same as SrcAndEdge) - */ - public static final TripletFields Src = SrcAndEdge; + public static final TripletFields Src = new TripletFields(true, false, true); /** * Expose the destination and edge fields but not the source field. (Same as Dst) */ - public static final TripletFields DstAndEdge = new TripletFields(false, true, true); - - /** - * Expose the destination and edge fields but not the source field. (Same as DstAndEdge) - */ - public static final TripletFields Dst = DstAndEdge; + public static final TripletFields Dst = new TripletFields(false, true, true); /** * Expose all the fields (source, edge, and destination). diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index e40ae0d615..e139959c3f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -85,7 +85,7 @@ object PageRank extends Logging { // Associate the degree with each vertex .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } // Set the weight on the edges based on the degree - .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.SrcOnly ) + .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) // Set the vertex attributes to the initial pagerank values .mapVertices( (id, attr) => resetProb ) @@ -97,7 +97,7 @@ object PageRank extends Logging { // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation. val rankUpdates = rankGraph.aggregateMessages[Double]( - ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.SrcAndEdge) + ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src) // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index df773db6e4..a05d1ddb21 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -328,7 +328,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { "expected ctx.dstAttr to be null due to TripletFields, but it was " + ctx.dstAttr) } ctx.sendToDst(ctx.srcAttr) - }, _ + _, TripletFields.SrcOnly) + }, _ + _, TripletFields.Src) assert(agg.collect().toSet === (1 to n).map(x => (x: VertexId, "v")).toSet) } } |