aboutsummaryrefslogtreecommitdiff
path: root/bagel/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2011-04-23 13:38:37 -0700
committerAnkur Dave <ankurdave@gmail.com>2011-05-03 15:40:45 -0700
commit563c5e717cc75869c328bba17116313eab9e976b (patch)
tree99c2894a1bc9d4317b11d7de26c98f514b879794 /bagel/src/test
parentc18fa3ebc6848d2da19ac2f68c9e22870e135ecd (diff)
downloadspark-563c5e717cc75869c328bba17116313eab9e976b.tar.gz
spark-563c5e717cc75869c328bba17116313eab9e976b.tar.bz2
spark-563c5e717cc75869c328bba17116313eab9e976b.zip
Refactor and add aggregator support
Refactored out the agg() and comp() methods from Pregel.run. Defined an implicit conversion to allow applications that don't use aggregators to avoid including a null argument for the result of the aggregator in the compute function.
Diffstat (limited to 'bagel/src/test')
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala10
1 files changed, 6 insertions, 4 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 29f5f0c358..53a93a6b80 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -10,6 +10,8 @@ import scala.collection.mutable.ArrayBuffer
import spark._
+import bagel.Pregel._
+
@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex
@serializable class TestMessage(val targetId: String) extends Message
@@ -20,10 +22,10 @@ class BagelSuite extends FunSuite with Assertions {
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 5
val result =
- Pregel.run(sc, verts, msgs, new DefaultCombiner[TestMessage], 1) {
+ Pregel.run(sc, verts, msgs)()(addAggregatorArg {
(self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) =>
(new TestVertex(self.id, superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
- }
+ })
for (vert <- result.collect)
assert(vert.age === numSupersteps)
}
@@ -34,7 +36,7 @@ class BagelSuite extends FunSuite with Assertions {
val msgs = sc.parallelize(Array("a" -> new TestMessage("a")))
val numSupersteps = 5
val result =
- Pregel.run(sc, verts, msgs, new DefaultCombiner[TestMessage], 1) {
+ Pregel.run(sc, verts, msgs)()(addAggregatorArg {
(self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) =>
val msgsOut =
msgs match {
@@ -44,7 +46,7 @@ class BagelSuite extends FunSuite with Assertions {
new ArrayBuffer[TestMessage]()
}
(new TestVertex(self.id, self.active, self.age + 1), msgsOut)
- }
+ })
for (vert <- result.collect)
assert(vert.age === numSupersteps)
}