aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-11-26 00:55:28 -0800
committerReynold Xin <rxin@databricks.com>2014-11-26 00:55:28 -0800
commit288ce583b05004a8c71dcd836fab23caff5d4ba7 (patch)
tree6e2b5404528b473dd402141d0a23db638a4ddd77 /graphx
parente7f4d2534bb3361ec4b7af0d42bc798a7a425226 (diff)
downloadspark-288ce583b05004a8c71dcd836fab23caff5d4ba7.tar.gz
spark-288ce583b05004a8c71dcd836fab23caff5d4ba7.tar.bz2
spark-288ce583b05004a8c71dcd836fab23caff5d4ba7.zip
Removing confusing TripletFields
After additional discussion with rxin, I think having all the possible `TripletField` options is confusing. This pull request reduces the triplet fields to: ```java /** * None of the triplet fields are exposed. */ public static final TripletFields None = new TripletFields(false, false, false); /** * Expose only the edge field and not the source or destination field. */ public static final TripletFields EdgeOnly = new TripletFields(false, false, true); /** * Expose the source and edge fields but not the destination field. (Same as Src) */ public static final TripletFields Src = new TripletFields(true, false, true); /** * Expose the destination and edge fields but not the source field. (Same as Dst) */ public static final TripletFields Dst = new TripletFields(false, true, true); /** * Expose all the fields (source, edge, and destination). */ public static final TripletFields All = new TripletFields(true, true, true); ``` Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> Closes #3472 from jegonzal/SimplifyTripletFields and squashes the following commits: 91796b5 [Joseph E. Gonzalez] removing confusing triplet fields
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java29
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala4
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala2
4 files changed, 8 insertions, 33 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index d5150382d5..116d1ea700 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -129,15 +129,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
},
- (a, b) => a ++ b, TripletFields.SrcDstOnly)
+ (a, b) => a ++ b, TripletFields.All)
case EdgeDirection.In =>
graph.aggregateMessages[Array[(VertexId,VD)]](
ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
- (a, b) => a ++ b, TripletFields.SrcOnly)
+ (a, b) => a ++ b, TripletFields.Src)
case EdgeDirection.Out =>
graph.aggregateMessages[Array[(VertexId,VD)]](
ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
- (a, b) => a ++ b, TripletFields.DstOnly)
+ (a, b) => a ++ b, TripletFields.Dst)
case EdgeDirection.Both =>
throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
"EdgeDirection.Either instead.")
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
index 8dfccfe2e2..7eb4ae0f44 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
+++ b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
@@ -56,39 +56,14 @@ public class TripletFields implements Serializable {
public static final TripletFields EdgeOnly = new TripletFields(false, false, true);
/**
- * Expose only the source field and not the edge or destination field.
- */
- public static final TripletFields SrcOnly = new TripletFields(true, false, false);
-
- /**
- * Expose only the destination field and not the edge or source field.
- */
- public static final TripletFields DstOnly = new TripletFields(false, true, false);
-
- /**
- * Expose the source and destination fields but not the edge field.
- */
- public static final TripletFields SrcDstOnly = new TripletFields(true, true, false);
-
- /**
* Expose the source and edge fields but not the destination field. (Same as Src)
*/
- public static final TripletFields SrcAndEdge = new TripletFields(true, false, true);
-
- /**
- * Expose the source and edge fields but not the destination field. (Same as SrcAndEdge)
- */
- public static final TripletFields Src = SrcAndEdge;
+ public static final TripletFields Src = new TripletFields(true, false, true);
/**
* Expose the destination and edge fields but not the source field. (Same as Dst)
*/
- public static final TripletFields DstAndEdge = new TripletFields(false, true, true);
-
- /**
- * Expose the destination and edge fields but not the source field. (Same as DstAndEdge)
- */
- public static final TripletFields Dst = DstAndEdge;
+ public static final TripletFields Dst = new TripletFields(false, true, true);
/**
* Expose all the fields (source, edge, and destination).
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index e40ae0d615..e139959c3f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -85,7 +85,7 @@ object PageRank extends Logging {
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
- .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.SrcOnly )
+ .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => resetProb )
@@ -97,7 +97,7 @@ object PageRank extends Logging {
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.aggregateMessages[Double](
- ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.SrcAndEdge)
+ ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index df773db6e4..a05d1ddb21 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -328,7 +328,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
"expected ctx.dstAttr to be null due to TripletFields, but it was " + ctx.dstAttr)
}
ctx.sendToDst(ctx.srcAttr)
- }, _ + _, TripletFields.SrcOnly)
+ }, _ + _, TripletFields.Src)
assert(agg.collect().toSet === (1 to n).map(x => (x: VertexId, "v")).toSet)
}
}