aboutsummaryrefslogtreecommitdiff
path: root/bagel/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-11 13:23:50 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-11 13:23:50 -0800
commitea08537143d58b79b3ae5d083e9b3a5647257da8 (patch)
treeff5329ab6dc12a5326ab95f85cd1b0e8eee2b231 /bagel/src
parentda8afbc77e5796d45686034db5560f18c057d3c9 (diff)
downloadspark-ea08537143d58b79b3ae5d083e9b3a5647257da8.tar.gz
spark-ea08537143d58b79b3ae5d083e9b3a5647257da8.tar.bz2
spark-ea08537143d58b79b3ae5d083e9b3a5647257da8.zip
Fixed an exponential recursion that could happen with doCheckpoint due
to lack of memoization
Diffstat (limited to 'bagel/src')
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala35
1 files changed, 27 insertions, 8 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 3c2f9c4616..47829a431e 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -1,10 +1,8 @@
package spark.bagel
import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
import scala.collection.mutable.ArrayBuffer
@@ -13,7 +11,7 @@ import spark._
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable
-class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
+class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {
var sc: SparkContext = _
@@ -25,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}
-
+
test("halting by voting") {
sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0))))
@@ -36,8 +34,9 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
(new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
}
- for ((id, vert) <- result.collect)
+ for ((id, vert) <- result.collect) {
assert(vert.age === numSupersteps)
+ }
}
test("halting by message silence") {
@@ -57,7 +56,27 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
}
(new TestVertex(self.active, self.age + 1), msgsOut)
}
- for ((id, vert) <- result.collect)
+ for ((id, vert) <- result.collect) {
assert(vert.age === numSupersteps)
+ }
+ }
+
+ test("large number of iterations") {
+ // This tests whether jobs with a large number of iterations finish in a reasonable time,
+ // because non-memoized recursion in RDD or DAGScheduler used to cause them to hang
+ failAfter(10 seconds) {
+ sc = new SparkContext("local", "test")
+ val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
+ val msgs = sc.parallelize(Array[(String, TestMessage)]())
+ val numSupersteps = 50
+ val result =
+ Bagel.run(sc, verts, msgs, sc.defaultParallelism) {
+ (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
+ (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
+ }
+ for ((id, vert) <- result.collect) {
+ assert(vert.age === numSupersteps)
+ }
+ }
}
}