diff options
author | Ankur Dave <ankurdave@gmail.com> | 2011-04-15 13:43:29 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2011-05-03 15:40:31 -0700 |
commit | 1c8ca0ebe1537c8f424722294794a66ff123f132 (patch) | |
tree | f4375a4cbe417dc440d7bd0a2ecea458f8bddea0 /bagel | |
parent | c5b3ea755ff8a69aa39dd6e46d57cbe9d5bcbcae (diff) | |
download | spark-1c8ca0ebe1537c8f424722294794a66ff123f132.tar.gz spark-1c8ca0ebe1537c8f424722294794a66ff123f132.tar.bz2 spark-1c8ca0ebe1537c8f424722294794a66ff123f132.zip |
Add Bagel test suite
Note: This test suite currently fails for the same reason that the
Spark Core test suite fails: Spark currently seems to have a bug where
any test after the first one fails.
Diffstat (limited to 'bagel')
-rw-r--r-- | bagel/src/main/scala/bagel/Pregel.scala | 8 | ||||
-rw-r--r-- | bagel/src/test/scala/bagel/BagelSuite.scala | 57 |
2 files changed, 65 insertions, 0 deletions
diff --git a/bagel/src/main/scala/bagel/Pregel.scala b/bagel/src/main/scala/bagel/Pregel.scala index 5ef398d783..67bc582fd1 100644 --- a/bagel/src/main/scala/bagel/Pregel.scala +++ b/bagel/src/main/scala/bagel/Pregel.scala @@ -75,6 +75,14 @@ object Pregel extends Logging { run(sc, newVerts, newMsgs, createCombiner, mergeMsg, mergeCombiners, numSplits, superstep + 1)(compute) } } + + def defaultCreateCombiner[M <: Message](msg: M): ArrayBuffer[M] = ArrayBuffer(msg) + def defaultMergeMsg[M <: Message](combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] = + combiner += msg + def defaultMergeCombiners[M <: Message](a: ArrayBuffer[M], b: ArrayBuffer[M]): ArrayBuffer[M] = + a ++= b + def defaultCompute[V <: Vertex, M <: Message](self: V, msgs: Option[ArrayBuffer[M]], superstep: Int): (V, Iterable[M]) = + (self, List()) } /** diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala new file mode 100644 index 0000000000..72aecb7fd8 --- /dev/null +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -0,0 +1,57 @@ +package bagel + +import org.scalatest.{FunSuite, Assertions} +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import scala.collection.mutable.ArrayBuffer + +import spark._ + +@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex +@serializable class TestMessage(val targetId: String) extends Message + +class BagelSuite extends FunSuite with Assertions { + test("halting by voting") { + val sc = new SparkContext("local", "test") + val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(id, true, 0)))) + val msgs = sc.parallelize(Array[(String, TestMessage)]()) + val numSupersteps = 5 + val result = + Pregel.run(sc, verts, msgs, + Pregel.defaultCreateCombiner[TestMessage], + Pregel.defaultMergeMsg[TestMessage], + Pregel.defaultMergeCombiners[TestMessage], 1) { + (self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) => + (new TestVertex(self.id, superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) + } + for (vert <- result.collect) + assert(vert.age === numSupersteps) + } + + test("halting by message silence") { + val sc = new SparkContext("local", "test") + val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(id, false, 0)))) + val msgs = sc.parallelize(Array("a" -> new TestMessage("a"))) + val numSupersteps = 5 + val result = + Pregel.run(sc, verts, msgs, + Pregel.defaultCreateCombiner[TestMessage], + Pregel.defaultMergeMsg[TestMessage], + Pregel.defaultMergeCombiners[TestMessage], 1) { + (self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) => + val msgsOut = + msgs match { + case Some(ms) if (superstep < numSupersteps - 1) => + ms + case _ => + new ArrayBuffer[TestMessage]() + } + (new TestVertex(self.id, self.active, self.age + 1), msgsOut) + } + for (vert <- result.collect) + assert(vert.age === numSupersteps) + } +} |