aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-13 16:48:11 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-13 17:03:03 -0800
commitae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded (patch)
tree2ed8615d2239f7f349fab128cb3eaeec191f3abb /graphx/src/test
parent1bd5cefcae2769d48ad5ef4b8197193371c754da (diff)
downloadspark-ae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded.tar.gz
spark-ae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded.tar.bz2
spark-ae4b75d94a4a0f2545e6d90d6f9b8f162bf70ded.zip
Add EdgeDirection.Either and use it to fix CC bug
The bug was due to a misunderstanding of the activeSetOpt parameter to Graph.mapReduceTriplets. Passing EdgeDirection.Both causes mapReduceTriplets to run only on edges with *both* vertices in the active set. This commit adds EdgeDirection.Either, which causes mapReduceTriplets to run on edges with *either* vertex in the active set. This is what connected components needed.
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala2
3 files changed, 3 insertions, 3 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index 7a901409d5..280f50e39a 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -28,7 +28,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
val chain = (0 until 100).map(x => (x, (x+1)%100) )
val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
val graph = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
- val nbrs = graph.collectNeighborIds(EdgeDirection.Both).cache()
+ val nbrs = graph.collectNeighborIds(EdgeDirection.Either).cache()
assert(nbrs.count === chain.size)
assert(graph.numVertices === nbrs.count)
nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) }
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
index 1ff3d75633..bceff11b8e 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
@@ -32,7 +32,7 @@ class PregelSuite extends FunSuite with LocalSparkContext {
Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet)
val result = Pregel(chainWithSeed, 0)(
(vid, attr, msg) => math.max(msg, attr),
- et => Iterator((et.dstId, et.srcAttr)),
+ et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty,
(a: Int, b: Int) => math.max(a, b))
assert(result.vertices.collect.toSet ===
chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
index 86da8f1b46..27c8705bca 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
@@ -102,7 +102,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
- val ccGraph = graph.connectedComponents(undirected = true)
+ val ccGraph = graph.connectedComponents()
val vertices = ccGraph.vertices.collect
for ( (id, cc) <- vertices ) {
assert(cc == 0)