aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala18
1 files changed, 18 insertions, 0 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 47829a431e..25db395c22 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -7,6 +7,7 @@ import org.scalatest.time.SpanSugar._
import scala.collection.mutable.ArrayBuffer
import spark._
+import storage.StorageLevel
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable
@@ -79,4 +80,21 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
}
}
}
+
+ test("using non-default persistence level") {
+ failAfter(10 seconds) {
+ sc = new SparkContext("local", "test")
+ val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
+ val msgs = sc.parallelize(Array[(String, TestMessage)]())
+ val numSupersteps = 50
+ val result =
+ Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
+ (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
+ (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
+ }
+ for ((id, vert) <- result.collect) {
+ assert(vert.age === numSupersteps)
+ }
+ }
+ }
}