aboutsummaryrefslogtreecommitdiff
path: root/graph/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-15 15:08:08 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-16 17:37:51 -0800
commit3ade8be8f218cdec3b90e48069595a3c556e0f27 (patch)
treeceeb1dd63caabfd8190429e1f5bc406252eb1d24 /graph/src/test
parent0476c84c516dda8c23e66f7796389995a44a0878 (diff)
downloadspark-3ade8be8f218cdec3b90e48069595a3c556e0f27.tar.gz
spark-3ade8be8f218cdec3b90e48069595a3c556e0f27.tar.bz2
spark-3ade8be8f218cdec3b90e48069595a3c556e0f27.zip
Add clustered index on edges by source vertex
This allows efficient edge scan in mapReduceTriplets when many source vertices are inactive. The scan method switches from edge scan to clustered index scan when less than 80% of source vertices are active.
Diffstat (limited to 'graph/src/test')
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala25
1 files changed, 25 insertions, 0 deletions
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index 68a171b12f..a85a31f79d 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -1,9 +1,12 @@
package org.apache.spark.graph
+import scala.util.Random
+
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.graph.LocalSparkContext._
+import org.apache.spark.graph.impl.EdgePartitionBuilder
import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
@@ -59,6 +62,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// mapVertices changing type
val mappedVAttrs2 = reverseStar.mapVertices((vid, attr) => attr.length)
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet)
+ // groupEdges
+ val doubleStar = Graph.fromEdgeTuples(
+ sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v")
+ val star2 = doubleStar.groupEdges { (a, b) => a}
+ assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
+ star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
+ assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
}
}
@@ -206,4 +216,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}
+
+ test("EdgePartition.sort") {
+ val edgesFrom0 = List(Edge(0, 1, 0))
+ val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
+ val sortedEdges = edgesFrom0 ++ edgesFrom1
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- Random.shuffle(sortedEdges)) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
+ assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
+ assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
+ }
}