aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/main
diff options
context:
space:
mode:
authorRobin East <robin.east@xense.co.uk>2016-02-21 17:07:17 -0800
committerReynold Xin <rxin@databricks.com>2016-02-21 17:07:17 -0800
commit3d79f1065cd02133ad9dd4423c09b8c8b52b38e2 (patch)
treeb9271d70cbd970647fc285da7903aeefb3e3fa59 /graphx/src/main
parent0f90f4e6ac9e9ca694e3622b866f33d3fdf1a459 (diff)
downloadspark-3d79f1065cd02133ad9dd4423c09b8c8b52b38e2.tar.gz
spark-3d79f1065cd02133ad9dd4423c09b8c8b52b38e2.tar.bz2
spark-3d79f1065cd02133ad9dd4423c09b8c8b52b38e2.zip
[SPARK-3650][GRAPHX] Triangle Count handles reverse edges incorrectly
jegonzal ankurdave please could you review ## What changes were proposed in this pull request? Reworking of jegonzal PR #2495 to address the issue identified in SPARK-3650. Code amended to use the convertToCanonicalEdges method. ## How was the this patch tested? Patch was tested using the unit tests created in PR #2495 Author: Robin East <robin.east@xense.co.uk> Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> Closes #11290 from insidedctm/spark-3650.
Diffstat (limited to 'graphx/src/main')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala63
2 files changed, 57 insertions, 20 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 97a82239a9..3e8c385302 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -17,10 +17,15 @@
package org.apache.spark.graphx
+import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag
import scala.util.Random
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
import org.apache.spark.SparkException
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
+import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.graphx.lib._
import org.apache.spark.rdd.RDD
@@ -184,6 +189,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
}
/**
+ * Remove self edges.
+ *
+ * @return a graph with all self edges removed
+ */
+ def removeSelfEdges(): Graph[VD, ED] = {
+ graph.subgraph(epred = e => e.srcId != e.dstId)
+ }
+
+ /**
* Join the vertices with an RDD and then apply a function from the
* vertex and RDD entry to a new vertex value. The input table
* should contain at most one entry for each vertex. If no entry is
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index a5d598053f..51bcdf20de 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx.lib
import scala.reflect.ClassTag
import org.apache.spark.graphx._
+import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D
/**
* Compute the number of triangles passing through each vertex.
@@ -27,25 +28,47 @@ import org.apache.spark.graphx._
* The algorithm is relatively straightforward and can be computed in three steps:
*
* <ul>
- * <li>Compute the set of neighbors for each vertex
- * <li>For each edge compute the intersection of the sets and send the count to both vertices.
- * <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.
+ * <li> Compute the set of neighbors for each vertex</li>
+ * <li> For each edge compute the intersection of the sets and send the count to both vertices.</li>
+ * <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.</li>
* </ul>
*
- * Note that the input graph should have its edges in canonical direction
- * (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
- * using [[org.apache.spark.graphx.Graph#partitionBy]].
+ * There are two implementations. The default `TriangleCount.run` implementation first removes
+ * self cycles and canonicalizes the graph to ensure that the following conditions hold:
+ * <ul>
+ * <li> There are no self edges</li>
+ * <li> All edges are oriented src > dst</li>
+ * <li> There are no duplicate edges</li>
+ * </ul>
+ * However, the canonicalization procedure is costly as it requires repartitioning the graph.
+ * If the input data is already in "canonical form" with self cycles removed then the
+ * `TriangleCount.runPreCanonicalized` should be used instead.
+ *
+ * {{{
+ * val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges()
+ * val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
+ * }}}
+ *
*/
object TriangleCount {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
- // Remove redundant edges
- val g = graph.groupEdges((a, b) => a).cache()
+ // Transform the edge data something cheap to shuffle and then canonicalize
+ val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
+ // Get the triangle counts
+ val counters = runPreCanonicalized(canonicalGraph).vertices
+ // Join them bath with the original graph
+ graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
+ optCounter.getOrElse(0)
+ }
+ }
+
+ def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Construct set representations of the neighborhoods
val nbrSets: VertexRDD[VertexSet] =
- g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
- val set = new VertexSet(4)
+ graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
+ val set = new VertexSet(nbrs.length)
var i = 0
while (i < nbrs.size) {
// prevent self cycle
@@ -56,14 +79,14 @@ object TriangleCount {
}
set
}
+
// join the sets with the graph
- val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
+ val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}
+
// Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
- assert(ctx.srcAttr != null)
- assert(ctx.dstAttr != null)
val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
(ctx.srcAttr, ctx.dstAttr)
} else {
@@ -80,15 +103,15 @@ object TriangleCount {
ctx.sendToSrc(counter)
ctx.sendToDst(counter)
}
+
// compute the intersection along edges
val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
- g.outerJoinVertices(counters) {
- (vid, _, optCounter: Option[Int]) =>
- val dblCount = optCounter.getOrElse(0)
- // double count should be even (divisible by two)
- assert((dblCount & 1) == 0)
- dblCount / 2
+ graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
+ val dblCount = optCounter.getOrElse(0)
+ // This algorithm double counts each triangle so the final count should be even
+ require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.")
+ dblCount / 2
}
- } // end of TriangleCount
+ }
}