diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-11 13:23:50 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-11 13:23:50 -0800 |
commit | ea08537143d58b79b3ae5d083e9b3a5647257da8 (patch) | |
tree | ff5329ab6dc12a5326ab95f85cd1b0e8eee2b231 /bagel/src/test/scala | |
parent | da8afbc77e5796d45686034db5560f18c057d3c9 (diff) | |
download | spark-ea08537143d58b79b3ae5d083e9b3a5647257da8.tar.gz spark-ea08537143d58b79b3ae5d083e9b3a5647257da8.tar.bz2 spark-ea08537143d58b79b3ae5d083e9b3a5647257da8.zip |
Fixed an exponential recursion that could happen with doCheckpoint due
to lack of memoization
Diffstat (limited to 'bagel/src/test/scala')
-rw-r--r-- | bagel/src/test/scala/bagel/BagelSuite.scala | 35 |
1 files changed, 27 insertions, 8 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 3c2f9c4616..47829a431e 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -1,10 +1,8 @@ package spark.bagel import org.scalatest.{FunSuite, Assertions, BeforeAndAfter} -import org.scalatest.prop.Checkers -import org.scalacheck.Arbitrary._ -import org.scalacheck.Gen -import org.scalacheck.Prop._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ import scala.collection.mutable.ArrayBuffer @@ -13,7 +11,7 @@ import spark._ class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable class TestMessage(val targetId: String) extends Message[String] with Serializable -class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { +class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts { var sc: SparkContext = _ @@ -25,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") } - + test("halting by voting") { sc = new SparkContext("local", "test") val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0)))) @@ -36,8 +34,9 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) } - for ((id, vert) <- result.collect) + for ((id, vert) <- result.collect) { assert(vert.age === numSupersteps) + } } test("halting by message silence") { @@ -57,7 +56,27 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { } (new TestVertex(self.active, self.age + 1), msgsOut) } - for ((id, vert) <- result.collect) + for ((id, vert) <- result.collect) { assert(vert.age === numSupersteps) + } + } + + test("large number of iterations") { + // This tests whether jobs with a large number of iterations finish in a reasonable time, + // because non-memoized recursion in RDD or DAGScheduler used to cause them to hang + failAfter(10 seconds) { + sc = new SparkContext("local", "test") + val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0)))) + val msgs = sc.parallelize(Array[(String, TestMessage)]()) + val numSupersteps = 50 + val result = + Bagel.run(sc, verts, msgs, sc.defaultParallelism) { + (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => + (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) + } + for ((id, vert) <- result.collect) { + assert(vert.age === numSupersteps) + } + } } } |