From ea08537143d58b79b3ae5d083e9b3a5647257da8 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 11 Feb 2013 13:23:50 -0800 Subject: Fixed an exponential recursion that could happen with doCheckpoint due to lack of memoization --- bagel/src/test/scala/bagel/BagelSuite.scala | 35 ++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) (limited to 'bagel/src') diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 3c2f9c4616..47829a431e 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -1,10 +1,8 @@ package spark.bagel import org.scalatest.{FunSuite, Assertions, BeforeAndAfter} -import org.scalatest.prop.Checkers -import org.scalacheck.Arbitrary._ -import org.scalacheck.Gen -import org.scalacheck.Prop._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ import scala.collection.mutable.ArrayBuffer @@ -13,7 +11,7 @@ import spark._ class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable class TestMessage(val targetId: String) extends Message[String] with Serializable -class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { +class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts { var sc: SparkContext = _ @@ -25,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") } - + test("halting by voting") { sc = new SparkContext("local", "test") val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0)))) @@ -36,8 +34,9 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) } - for ((id, vert) <- result.collect) + for ((id, vert) <- result.collect) { assert(vert.age === numSupersteps) + } } test("halting by message silence") { @@ -57,7 +56,27 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { } (new TestVertex(self.active, self.age + 1), msgsOut) } - for ((id, vert) <- result.collect) + for ((id, vert) <- result.collect) { assert(vert.age === numSupersteps) + } + } + + test("large number of iterations") { + // This tests whether jobs with a large number of iterations finish in a reasonable time, + // because non-memoized recursion in RDD or DAGScheduler used to cause them to hang + failAfter(10 seconds) { + sc = new SparkContext("local", "test") + val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0)))) + val msgs = sc.parallelize(Array[(String, TestMessage)]()) + val numSupersteps = 50 + val result = + Bagel.run(sc, verts, msgs, sc.defaultParallelism) { + (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => + (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) + } + for ((id, vert) <- result.collect) { + assert(vert.age === numSupersteps) + } + } } } -- cgit v1.2.3