diff options
author | Robin East <robin.east@xense.co.uk> | 2016-02-21 17:07:17 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-21 17:07:17 -0800 |
commit | 3d79f1065cd02133ad9dd4423c09b8c8b52b38e2 (patch) | |
tree | b9271d70cbd970647fc285da7903aeefb3e3fa59 /graphx/src/main | |
parent | 0f90f4e6ac9e9ca694e3622b866f33d3fdf1a459 (diff) | |
download | spark-3d79f1065cd02133ad9dd4423c09b8c8b52b38e2.tar.gz spark-3d79f1065cd02133ad9dd4423c09b8c8b52b38e2.tar.bz2 spark-3d79f1065cd02133ad9dd4423c09b8c8b52b38e2.zip |
[SPARK-3650][GRAPHX] Triangle Count handles reverse edges incorrectly
jegonzal ankurdave please could you review
## What changes were proposed in this pull request?
Reworking of jegonzal PR #2495 to address the issue identified in SPARK-3650. Code amended to use the convertToCanonicalEdges method.
## How was the this patch tested?
Patch was tested using the unit tests created in PR #2495
Author: Robin East <robin.east@xense.co.uk>
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Closes #11290 from insidedctm/spark-3650.
Diffstat (limited to 'graphx/src/main')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 14 | ||||
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala | 63 |
2 files changed, 57 insertions, 20 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 97a82239a9..3e8c385302 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -17,10 +17,15 @@ package org.apache.spark.graphx +import scala.reflect.{classTag, ClassTag} import scala.reflect.ClassTag import scala.util.Random +import org.apache.spark.HashPartitioner +import org.apache.spark.SparkContext._ import org.apache.spark.SparkException +import org.apache.spark.graphx.impl.EdgePartitionBuilder +import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.graphx.lib._ import org.apache.spark.rdd.RDD @@ -184,6 +189,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } /** + * Remove self edges. + * + * @return a graph with all self edges removed + */ + def removeSelfEdges(): Graph[VD, ED] = { + graph.subgraph(epred = e => e.srcId != e.dstId) + } + + /** * Join the vertices with an RDD and then apply a function from the * vertex and RDD entry to a new vertex value. The input table * should contain at most one entry for each vertex. If no entry is diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index a5d598053f..51bcdf20de 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -20,6 +20,7 @@ package org.apache.spark.graphx.lib import scala.reflect.ClassTag import org.apache.spark.graphx._ +import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D /** * Compute the number of triangles passing through each vertex. @@ -27,25 +28,47 @@ import org.apache.spark.graphx._ * The algorithm is relatively straightforward and can be computed in three steps: * * <ul> - * <li>Compute the set of neighbors for each vertex - * <li>For each edge compute the intersection of the sets and send the count to both vertices. - * <li> Compute the sum at each vertex and divide by two since each triangle is counted twice. + * <li> Compute the set of neighbors for each vertex</li> + * <li> For each edge compute the intersection of the sets and send the count to both vertices.</li> + * <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.</li> * </ul> * - * Note that the input graph should have its edges in canonical direction - * (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned - * using [[org.apache.spark.graphx.Graph#partitionBy]]. + * There are two implementations. The default `TriangleCount.run` implementation first removes + * self cycles and canonicalizes the graph to ensure that the following conditions hold: + * <ul> + * <li> There are no self edges</li> + * <li> All edges are oriented src > dst</li> + * <li> There are no duplicate edges</li> + * </ul> + * However, the canonicalization procedure is costly as it requires repartitioning the graph. + * If the input data is already in "canonical form" with self cycles removed then the + * `TriangleCount.runPreCanonicalized` should be used instead. + * + * {{{ + * val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges() + * val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices + * }}} + * */ object TriangleCount { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { - // Remove redundant edges - val g = graph.groupEdges((a, b) => a).cache() + // Transform the edge data something cheap to shuffle and then canonicalize + val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges() + // Get the triangle counts + val counters = runPreCanonicalized(canonicalGraph).vertices + // Join them bath with the original graph + graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) => + optCounter.getOrElse(0) + } + } + + def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { // Construct set representations of the neighborhoods val nbrSets: VertexRDD[VertexSet] = - g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => - val set = new VertexSet(4) + graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => + val set = new VertexSet(nbrs.length) var i = 0 while (i < nbrs.size) { // prevent self cycle @@ -56,14 +79,14 @@ object TriangleCount { } set } + // join the sets with the graph - val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { + val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) { (vid, _, optSet) => optSet.getOrElse(null) } + // Edge function computes intersection of smaller vertex with larger vertex def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) { - assert(ctx.srcAttr != null) - assert(ctx.dstAttr != null) val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) { (ctx.srcAttr, ctx.dstAttr) } else { @@ -80,15 +103,15 @@ object TriangleCount { ctx.sendToSrc(counter) ctx.sendToDst(counter) } + // compute the intersection along edges val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _) // Merge counters with the graph and divide by two since each triangle is counted twice - g.outerJoinVertices(counters) { - (vid, _, optCounter: Option[Int]) => - val dblCount = optCounter.getOrElse(0) - // double count should be even (divisible by two) - assert((dblCount & 1) == 0) - dblCount / 2 + graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) => + val dblCount = optCounter.getOrElse(0) + // This algorithm double counts each triangle so the final count should be even + require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.") + dblCount / 2 } - } // end of TriangleCount + } } |