aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2011-04-15 13:43:29 -0700
committerAnkur Dave <ankurdave@gmail.com>2011-05-03 15:40:31 -0700
commit1c8ca0ebe1537c8f424722294794a66ff123f132 (patch)
treef4375a4cbe417dc440d7bd0a2ecea458f8bddea0
parentc5b3ea755ff8a69aa39dd6e46d57cbe9d5bcbcae (diff)
downloadspark-1c8ca0ebe1537c8f424722294794a66ff123f132.tar.gz
spark-1c8ca0ebe1537c8f424722294794a66ff123f132.tar.bz2
spark-1c8ca0ebe1537c8f424722294794a66ff123f132.zip
Add Bagel test suite
Note: This test suite currently fails for the same reason that the Spark Core test suite fails: Spark currently seems to have a bug where any test after the first one fails.
-rw-r--r--bagel/src/main/scala/bagel/Pregel.scala8
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala57
-rw-r--r--project/build/SparkProject.scala6
3 files changed, 70 insertions, 1 deletions
diff --git a/bagel/src/main/scala/bagel/Pregel.scala b/bagel/src/main/scala/bagel/Pregel.scala
index 5ef398d783..67bc582fd1 100644
--- a/bagel/src/main/scala/bagel/Pregel.scala
+++ b/bagel/src/main/scala/bagel/Pregel.scala
@@ -75,6 +75,14 @@ object Pregel extends Logging {
run(sc, newVerts, newMsgs, createCombiner, mergeMsg, mergeCombiners, numSplits, superstep + 1)(compute)
}
}
+
+ def defaultCreateCombiner[M <: Message](msg: M): ArrayBuffer[M] = ArrayBuffer(msg)
+ def defaultMergeMsg[M <: Message](combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] =
+ combiner += msg
+ def defaultMergeCombiners[M <: Message](a: ArrayBuffer[M], b: ArrayBuffer[M]): ArrayBuffer[M] =
+ a ++= b
+ def defaultCompute[V <: Vertex, M <: Message](self: V, msgs: Option[ArrayBuffer[M]], superstep: Int): (V, Iterable[M]) =
+ (self, List())
}
/**
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
new file mode 100644
index 0000000000..72aecb7fd8
--- /dev/null
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -0,0 +1,57 @@
+package bagel
+
+import org.scalatest.{FunSuite, Assertions}
+import org.scalatest.prop.Checkers
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark._
+
+@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex
+@serializable class TestMessage(val targetId: String) extends Message
+
+class BagelSuite extends FunSuite with Assertions {
+ test("halting by voting") {
+ val sc = new SparkContext("local", "test")
+ val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(id, true, 0))))
+ val msgs = sc.parallelize(Array[(String, TestMessage)]())
+ val numSupersteps = 5
+ val result =
+ Pregel.run(sc, verts, msgs,
+ Pregel.defaultCreateCombiner[TestMessage],
+ Pregel.defaultMergeMsg[TestMessage],
+ Pregel.defaultMergeCombiners[TestMessage], 1) {
+ (self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) =>
+ (new TestVertex(self.id, superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
+ }
+ for (vert <- result.collect)
+ assert(vert.age === numSupersteps)
+ }
+
+ test("halting by message silence") {
+ val sc = new SparkContext("local", "test")
+ val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(id, false, 0))))
+ val msgs = sc.parallelize(Array("a" -> new TestMessage("a")))
+ val numSupersteps = 5
+ val result =
+ Pregel.run(sc, verts, msgs,
+ Pregel.defaultCreateCombiner[TestMessage],
+ Pregel.defaultMergeMsg[TestMessage],
+ Pregel.defaultMergeCombiners[TestMessage], 1) {
+ (self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) =>
+ val msgsOut =
+ msgs match {
+ case Some(ms) if (superstep < numSupersteps - 1) =>
+ ms
+ case _ =>
+ new ArrayBuffer[TestMessage]()
+ }
+ (new TestVertex(self.id, self.active, self.age + 1), msgsOut)
+ }
+ for (vert <- result.collect)
+ assert(vert.age === numSupersteps)
+ }
+}
diff --git a/project/build/SparkProject.scala b/project/build/SparkProject.scala
index a6ee25bc3d..a6520d1f03 100644
--- a/project/build/SparkProject.scala
+++ b/project/build/SparkProject.scala
@@ -14,7 +14,7 @@ extends ParentProject(info) with IdeaProject
lazy val examples =
project("examples", "Spark Examples", new ExamplesProject(_), core)
- lazy val bagel = project("bagel", "Bagel", core)
+ lazy val bagel = project("bagel", "Bagel", new BagelProject(_), core)
class CoreProject(info: ProjectInfo)
extends DefaultProject(info) with Eclipsify with IdeaProject with DepJar with XmlTestReport
@@ -23,6 +23,10 @@ extends ParentProject(info) with IdeaProject
class ExamplesProject(info: ProjectInfo)
extends DefaultProject(info) with Eclipsify with IdeaProject
{}
+
+ class BagelProject(info: ProjectInfo)
+ extends DefaultProject(info) with DepJar with XmlTestReport
+ {}
}