diff options
author | Ankur Dave <ankurdave@gmail.com> | 2011-04-23 13:38:37 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2011-05-03 15:40:45 -0700 |
commit | 563c5e717cc75869c328bba17116313eab9e976b (patch) | |
tree | 99c2894a1bc9d4317b11d7de26c98f514b879794 /bagel/src/test/scala | |
parent | c18fa3ebc6848d2da19ac2f68c9e22870e135ecd (diff) | |
download | spark-563c5e717cc75869c328bba17116313eab9e976b.tar.gz spark-563c5e717cc75869c328bba17116313eab9e976b.tar.bz2 spark-563c5e717cc75869c328bba17116313eab9e976b.zip |
Refactor and add aggregator support
Refactored out the agg() and comp() methods from Pregel.run.
Defined an implicit conversion to allow applications that don't use
aggregators to avoid including a null argument for the result of the
aggregator in the compute function.
Diffstat (limited to 'bagel/src/test/scala')
-rw-r--r-- | bagel/src/test/scala/bagel/BagelSuite.scala | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 29f5f0c358..53a93a6b80 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -10,6 +10,8 @@ import scala.collection.mutable.ArrayBuffer import spark._ +import bagel.Pregel._ + @serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex @serializable class TestMessage(val targetId: String) extends Message @@ -20,10 +22,10 @@ class BagelSuite extends FunSuite with Assertions { val msgs = sc.parallelize(Array[(String, TestMessage)]()) val numSupersteps = 5 val result = - Pregel.run(sc, verts, msgs, new DefaultCombiner[TestMessage], 1) { + Pregel.run(sc, verts, msgs)()(addAggregatorArg { (self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) => (new TestVertex(self.id, superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) - } + }) for (vert <- result.collect) assert(vert.age === numSupersteps) } @@ -34,7 +36,7 @@ class BagelSuite extends FunSuite with Assertions { val msgs = sc.parallelize(Array("a" -> new TestMessage("a"))) val numSupersteps = 5 val result = - Pregel.run(sc, verts, msgs, new DefaultCombiner[TestMessage], 1) { + Pregel.run(sc, verts, msgs)()(addAggregatorArg { (self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) => val msgsOut = msgs match { @@ -44,7 +46,7 @@ class BagelSuite extends FunSuite with Assertions { new ArrayBuffer[TestMessage]() } (new TestVertex(self.id, self.active, self.age + 1), msgsOut) - } + }) for (vert <- result.collect) assert(vert.age === numSupersteps) } |