aboutsummaryrefslogtreecommitdiff
path: root/graph/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-12-05 18:57:36 -0800
committerReynold Xin <rxin@apache.org>2013-12-05 18:57:36 -0800
commit920282c36a6f7181cf936dc3b5ad0b729d20c95f (patch)
tree75d294113ba5a602e05297bd492d6c9759dde99c /graph/src/test
parent1c8500efc0a718b08a15f98c5c0cb23174498b29 (diff)
parent39b0256668c4f7806725fec751c477ea8c76cd84 (diff)
downloadspark-920282c36a6f7181cf936dc3b5ad0b729d20c95f.tar.gz
spark-920282c36a6f7181cf936dc3b5ad0b729d20c95f.tar.bz2
spark-920282c36a6f7181cf936dc3b5ad0b729d20c95f.zip
Merge branch 'pregel-replicate-changed' of github.com:ankurdave/graphx into pregel-replicate-changed
Diffstat (limited to 'graph/src/test')
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala30
1 files changed, 25 insertions, 5 deletions
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index 1d23911ff9..d5aa36c04f 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -120,9 +120,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("VertexSetRDD") {
withSpark(new SparkContext("local", "test")) { sc =>
- val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
+ val n = 100
+ val a = sc.parallelize((0 to n).map(x => (x.toLong, x.toLong)), 5)
val b = VertexRDD(a).mapValues(x => -x).cache() // Allow joining b with a derived RDD of b
- assert(b.count === 101)
+ assert(b.count === n + 1)
assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0)
val c = b.aggregateUsingIndex[Long](a, (x, y) => x)
assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0)
@@ -130,7 +131,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val e = a.filter(q => ((q._2 % 2) == 0))
assert(d.count === e.count)
assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
-
+ val f = b.mapValues(x => if (x % 2 == 0) -x else x)
+ assert(b.diff(f).collect().toSet === (2 to n by 2).map(x => (x.toLong, x.toLong)).toSet)
}
}
@@ -143,10 +145,28 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0)
// We should have 5 vertices.
- assert(subgraph.vertices.collect().toSet === (0 to n / 2).map(x => (x * 2, "v")).toSet)
+ assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)
// And 4 edges.
- assert(subgraph.edges.map(_.copy()).collect().toSet === (1 to n / 2).map(x => Edge(0, x * 2, 1)).toSet)
+ assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
+ }
+ }
+
+ test("deltaJoinVertices") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ // Create a star graph of 10 vertices
+ val n = 10
+ val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v1")
+
+ // Modify only vertices whose vids are even
+ val newVerts = star.vertices.mapValues((vid, attr) => if (vid % 2 == 0) "v2" else attr)
+ val changedVerts = star.vertices.diff(newVerts)
+
+ // Apply the modification to the graph
+ val changedStar = star.deltaJoinVertices(newVerts, changedVerts)
+
+ // The graph's vertices should be correct
+ assert(changedStar.vertices.collect().toSet === newVerts.collect().toSet)
}
}