aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test/scala/org/apache
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-02-15 09:20:49 +0000
committerSean Owen <sowen@cloudera.com>2016-02-15 09:20:49 +0000
commit56d49397e01306637edf23bbb4f3b9d396cdc6ff (patch)
treed1c91e37e2f1903a4d0e75b81058eebdb804178a /graphx/src/test/scala/org/apache
parenta8bbc4f50ef3faacf4b7fe865a29144ea87f0796 (diff)
downloadspark-56d49397e01306637edf23bbb4f3b9d396cdc6ff.tar.gz
spark-56d49397e01306637edf23bbb4f3b9d396cdc6ff.tar.bz2
spark-56d49397e01306637edf23bbb4f3b9d396cdc6ff.zip
[SPARK-12995][GRAPHX] Remove deprecate APIs from Pregel
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #10918 from maropu/RemoveDeprecateInPregel.
Diffstat (limited to 'graphx/src/test/scala/org/apache')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala52
1 files changed, 6 insertions, 46 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 2fbc6f069d..f497e001df 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -221,7 +221,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
- val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
+ val result = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+ graph, et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(result.collect().toSet === Set((1L, 2)))
}
}
@@ -281,49 +282,6 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
}
}
- test("mapReduceTriplets") {
- withSpark { sc =>
- val n = 5
- val star = starGraph(sc, n).mapVertices { (_, _) => 0 }.cache()
- val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
- val neighborDegreeSums = starDeg.mapReduceTriplets(
- edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
- (a: Int, b: Int) => a + b)
- assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
-
- // activeSetOpt
- val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
- val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
- val vids = complete.mapVertices((vid, attr) => vid).cache()
- val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
- val numEvenNeighbors = vids.mapReduceTriplets(et => {
- // Map function should only run on edges with destination in the active set
- if (et.dstId % 2 != 0) {
- throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
- }
- Iterator((et.srcId, 1))
- }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
- assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
-
- // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
- val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
- val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
- val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
- val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
- newOpt.getOrElse(old)
- }
- val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
- // Map function should only run on edges with source in the active set
- if (et.srcId % 2 != 1) {
- throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
- }
- Iterator((et.dstId, 1))
- }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
- assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
-
- }
- }
-
test("aggregateMessages") {
withSpark { sc =>
val n = 5
@@ -347,7 +305,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
(vid, a, bOpt) => bOpt.getOrElse(0)
}
- val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
+ val neighborDegreeSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+ reverseStarDegrees,
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect().toSet
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
@@ -420,7 +379,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
numEdgePartitions)
val graph = Graph.fromEdgeTuples(edges, 1)
- val neighborAttrSums = graph.mapReduceTriplets[Int](
+ val neighborAttrSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
+ graph,
et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
} finally {