diff options
author | Ankur Dave <ankurdave@gmail.com> | 2011-11-08 19:56:44 +0000 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2011-11-08 19:56:44 +0000 |
commit | c5be7d2b2268e44e3eafb460d4bf0fb0badf9b22 (patch) | |
tree | c83baed495a805e2f11374f4f4125e7c23a7d377 /bagel/src/test/scala | |
parent | 9e4c79a4d39b890b3eb55ffe8cef7d21eb31f0e6 (diff) | |
download | spark-c5be7d2b2268e44e3eafb460d4bf0fb0badf9b22.tar.gz spark-c5be7d2b2268e44e3eafb460d4bf0fb0badf9b22.tar.bz2 spark-c5be7d2b2268e44e3eafb460d4bf0fb0badf9b22.zip |
Update Bagel unit tests to reflect API change
Diffstat (limited to 'bagel/src/test/scala')
-rw-r--r-- | bagel/src/test/scala/bagel/BagelSuite.scala | 44 |
1 files changed, 21 insertions, 23 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 59356e09f0..0eda80af64 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -10,45 +10,43 @@ import scala.collection.mutable.ArrayBuffer import spark._ -import spark.bagel.Bagel._ - -class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex with Serializable -class TestMessage(val targetId: String) extends Message with Serializable +class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable +class TestMessage(val targetId: String) extends Message[String] with Serializable class BagelSuite extends FunSuite with Assertions { test("halting by voting") { val sc = new SparkContext("local", "test") - val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(id, true, 0)))) + val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0)))) val msgs = sc.parallelize(Array[(String, TestMessage)]()) val numSupersteps = 5 val result = - Bagel.run(sc, verts, msgs)()(addAggregatorArg { - (self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) => - (new TestVertex(self.id, superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) - }) - for (vert <- result.collect) + Bagel.run(sc, verts, msgs, sc.defaultParallelism) { + (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => + (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) + } + for ((id, vert) <- result.collect) assert(vert.age === numSupersteps) sc.stop() } test("halting by message silence") { val sc = new SparkContext("local", "test") - val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(id, false, 0)))) + val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(false, 0)))) val msgs = sc.parallelize(Array("a" -> new TestMessage("a"))) val numSupersteps = 5 val result = - Bagel.run(sc, verts, msgs)()(addAggregatorArg { - (self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) => - val msgsOut = - msgs match { - case Some(ms) if (superstep < numSupersteps - 1) => - ms - case _ => - new ArrayBuffer[TestMessage]() - } - (new TestVertex(self.id, self.active, self.age + 1), msgsOut) - }) - for (vert <- result.collect) + Bagel.run(sc, verts, msgs, sc.defaultParallelism) { + (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => + val msgsOut = + msgs match { + case Some(ms) if (superstep < numSupersteps - 1) => + ms + case _ => + Array[TestMessage]() + } + (new TestVertex(self.active, self.age + 1), msgsOut) + } + for ((id, vert) <- result.collect) assert(vert.age === numSupersteps) sc.stop() } |