aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-05-10 14:48:07 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-10 14:48:07 -0700
commit905173df57b90f90ebafb22e43f55164445330e6 (patch)
tree805b4c6a80d206268807bfea23554e280f0b9eee /graphx/src/test
parent6c2691d0a0ed46a8b8093e05a4708706cf187168 (diff)
downloadspark-905173df57b90f90ebafb22e43f55164445330e6.tar.gz
spark-905173df57b90f90ebafb22e43f55164445330e6.tar.bz2
spark-905173df57b90f90ebafb22e43f55164445330e6.zip
Unify GraphImpl RDDs + other graph load optimizations
This PR makes the following changes, primarily in e4fbd329aef85fe2c38b0167255d2a712893d683: 1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices). 2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former. 3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view. 4. *Join elimination for mapTriplets.* 5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`. Author: Ankur Dave <ankurdave@gmail.com> Closes #497 from ankurdave/unify-rdds and squashes the following commits: 332ab43 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds 4933e2e [Ankur Dave] Exclude RoutingTable from binary compatibility check 5ba8789 [Ankur Dave] Add GraphX upgrade guide from Spark 0.9.1 13ac845 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds a04765c [Ankur Dave] Remove unnecessary toOps call 57202e8 [Ankur Dave] Replace case with pair parameter 75af062 [Ankur Dave] Add explicit return types 04d3ae5 [Ankur Dave] Convert implicit parameter to context bound c88b269 [Ankur Dave] Revert upgradeIterator to if-in-a-loop 0d3584c [Ankur Dave] EdgePartition.size should be val 2a928b2 [Ankur Dave] Set locality wait 10b3596 [Ankur Dave] Clean up public API ae36110 [Ankur Dave] Fix style errors e4fbd32 [Ankur Dave] Unify GraphImpl RDDs + other graph load optimizations d6d60e2 [Ankur Dave] In GraphLoader, coalesce to minEdgePartitions 62c7b78 [Ankur Dave] In Analytics, take PageRank numIter d64e8d4 [Ankur Dave] Log current Pregel iteration
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala48
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala11
4 files changed, 49 insertions, 30 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 32b5fe4813..7b9bac5d9c 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val p = 100
val verts = 1 to n
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
- verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
+ verts.withFilter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
assert(graph.edges.partitions.length === p)
val partitionedGraph = graph.partitionBy(EdgePartition2D)
assert(graph.edges.partitions.length === p)
@@ -120,7 +120,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val part = iter.next()._2
Iterator((part.srcIds ++ part.dstIds).toSet)
}.collect
- assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
+ if (!verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) {
+ val numFailures = verts.count(id => partitionSets.count(_.contains(id)) > bound)
+ val failure = verts.maxBy(id => partitionSets.count(_.contains(id)))
+ fail(("Replication bound test failed for %d/%d vertices. " +
+ "Example: vertex %d replicated to %d (> %f) partitions.").format(
+ numFailures, n, failure, partitionSets.count(_.contains(failure)), bound))
+ }
// This should not be true for the default hash partitioning
val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
val part = iter.next()._2
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index e135d1d7ad..d2e0c01bc3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -26,10 +26,16 @@ import org.apache.spark.graphx._
class EdgePartitionSuite extends FunSuite {
+ def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A, Int] = {
+ val builder = new EdgePartitionBuilder[A, Int]
+ for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
+ builder.toEdgePartition
+ }
+
test("reverse") {
val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0))
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Nothing]
for (e <- edges) {
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -40,7 +46,7 @@ class EdgePartitionSuite extends FunSuite {
test("map") {
val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Nothing]
for (e <- edges) {
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -49,11 +55,22 @@ class EdgePartitionSuite extends FunSuite {
edges.map(e => e.copy(attr = e.srcId + e.dstId)))
}
+ test("filter") {
+ val edges = List(Edge(0, 1, 0), Edge(0, 2, 0), Edge(2, 0, 0))
+ val builder = new EdgePartitionBuilder[Int, Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ val filtered = edgePartition.filter(et => et.srcId == 0, (vid, attr) => vid == 0 || vid == 1)
+ assert(filtered.tripletIterator().toList.map(et => (et.srcId, et.dstId)) === List((0L, 1L)))
+ }
+
test("groupEdges") {
val edges = List(
Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32))
val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36))
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Nothing]
for (e <- edges) {
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -61,11 +78,19 @@ class EdgePartitionSuite extends FunSuite {
assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
}
+ test("upgradeIterator") {
+ val edges = List((0, 1, 0), (1, 0, 0))
+ val verts = List((0L, 1), (1L, 2))
+ val part = makeEdgePartition(edges).updateVertices(verts.iterator)
+ assert(part.upgradeIterator(part.iterator).map(_.toTuple).toList ===
+ part.tripletIterator().toList.map(_.toTuple))
+ }
+
test("indexIterator") {
val edgesFrom0 = List(Edge(0, 1, 0))
val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
val sortedEdges = edgesFrom0 ++ edgesFrom1
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Nothing]
for (e <- Random.shuffle(sortedEdges)) {
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -77,11 +102,6 @@ class EdgePartitionSuite extends FunSuite {
}
test("innerJoin") {
- def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
- val builder = new EdgePartitionBuilder[A]
- for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
- builder.toEdgePartition
- }
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
val a = makeEdgePartition(aList)
@@ -90,4 +110,14 @@ class EdgePartitionSuite extends FunSuite {
assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
}
+
+ test("isActive, numActives, replaceActives") {
+ val ep = new EdgePartitionBuilder[Nothing, Nothing].toEdgePartition
+ .withActiveSet(Iterator(0L, 2L, 0L))
+ assert(ep.isActive(0))
+ assert(!ep.isActive(1))
+ assert(ep.isActive(2))
+ assert(!ep.isActive(-1))
+ assert(ep.numActives == Some(2))
+ }
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
index 9cbb2d2acd..49b2704390 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
@@ -26,17 +26,11 @@ import org.apache.spark.graphx._
class EdgeTripletIteratorSuite extends FunSuite {
test("iterator.toList") {
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Int]
builder.add(1, 2, 0)
builder.add(1, 3, 0)
builder.add(1, 4, 0)
- val vidmap = new VertexIdToIndexMap
- vidmap.add(1)
- vidmap.add(2)
- vidmap.add(3)
- vidmap.add(4)
- val vs = Array.fill(vidmap.capacity)(0)
- val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition)
+ val iter = new EdgeTripletIterator[Int, Int](builder.toEdgePartition, true, true)
val result = iter.toList.map(et => (et.srcId, et.dstId))
assert(result === Seq((1, 2), (1, 3), (1, 4)))
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
index a048d13fd1..8bf1384d51 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -30,17 +30,6 @@ class VertexPartitionSuite extends FunSuite {
assert(!vp.isDefined(-1))
}
- test("isActive, numActives, replaceActives") {
- val vp = VertexPartition(Iterator((0L, 1), (1L, 1)))
- .filter { (vid, attr) => vid == 0 }
- .replaceActives(Iterator(0, 2, 0))
- assert(vp.isActive(0))
- assert(!vp.isActive(1))
- assert(vp.isActive(2))
- assert(!vp.isActive(-1))
- assert(vp.numActives == Some(2))
- }
-
test("map") {
val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).map { (vid, attr) => 2 }
assert(vp(0) === 2)