aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
authorNick Pentreath <nick.pentreath@gmail.com>2013-03-09 12:52:16 +0200
committerNick Pentreath <nick.pentreath@gmail.com>2013-03-09 12:52:16 +0200
commitd35c5a5176601b4c8e12d022cc72e42d6986c0a4 (patch)
tree70fe7c1eb02b9cd1128f0f95268c2c2effec7e3d /bagel
parent1e981d8b260423bfed6d4e67bd94ba4c07a4ff71 (diff)
downloadspark-d35c5a5176601b4c8e12d022cc72e42d6986c0a4.tar.gz
spark-d35c5a5176601b4c8e12d022cc72e42d6986c0a4.tar.bz2
spark-d35c5a5176601b4c8e12d022cc72e42d6986c0a4.zip
Adding test for non-default persistence level
Diffstat (limited to 'bagel')
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala18
1 files changed, 18 insertions, 0 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 47829a431e..25db395c22 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -7,6 +7,7 @@ import org.scalatest.time.SpanSugar._
import scala.collection.mutable.ArrayBuffer
import spark._
+import storage.StorageLevel
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable
@@ -79,4 +80,21 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
}
}
}
+
+ test("using non-default persistence level") {
+ failAfter(10 seconds) {
+ sc = new SparkContext("local", "test")
+ val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
+ val msgs = sc.parallelize(Array[(String, TestMessage)]())
+ val numSupersteps = 50
+ val result =
+ Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
+ (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
+ (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
+ }
+ for ((id, vert) <- result.collect) {
+ assert(vert.age === numSupersteps)
+ }
+ }
+ }
}