diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-13 16:48:11 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-13 17:03:03 -0800 |
commit | ae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded (patch) | |
tree | 2ed8615d2239f7f349fab128cb3eaeec191f3abb /graphx/src/test/scala | |
parent | 1bd5cefcae2769d48ad5ef4b8197193371c754da (diff) | |
download | spark-ae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded.tar.gz spark-ae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded.tar.bz2 spark-ae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded.zip |
Add EdgeDirection.Either and use it to fix CC bug
The bug was due to a misunderstanding of the activeSetOpt parameter to
Graph.mapReduceTriplets. Passing EdgeDirection.Both causes
mapReduceTriplets to run only on edges with *both* vertices in the
active set. This commit adds EdgeDirection.Either, which causes
mapReduceTriplets to run on edges with *either* vertex in the active
set. This is what connected components needed.
Diffstat (limited to 'graphx/src/test/scala')
3 files changed, 3 insertions, 3 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index 7a901409d5..280f50e39a 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -28,7 +28,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { val chain = (0 until 100).map(x => (x, (x+1)%100) ) val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) } val graph = Graph.fromEdgeTuples(rawEdges, 1.0).cache() - val nbrs = graph.collectNeighborIds(EdgeDirection.Both).cache() + val nbrs = graph.collectNeighborIds(EdgeDirection.Either).cache() assert(nbrs.count === chain.size) assert(graph.numVertices === nbrs.count) nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala index 1ff3d75633..bceff11b8e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala @@ -32,7 +32,7 @@ class PregelSuite extends FunSuite with LocalSparkContext { Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet) val result = Pregel(chainWithSeed, 0)( (vid, attr, msg) => math.max(msg, attr), - et => Iterator((et.dstId, et.srcAttr)), + et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty, (a: Int, b: Int) => math.max(a, b)) assert(result.vertices.collect.toSet === chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index 86da8f1b46..27c8705bca 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -102,7 +102,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) - val ccGraph = graph.connectedComponents(undirected = true) + val ccGraph = graph.connectedComponents() val vertices = ccGraph.vertices.collect for ( (id, cc) <- vertices ) { assert(cc == 0) |