aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
Diffstat (limited to 'bagel')
-rw-r--r--bagel/src/main/scala/spark/bagel/Bagel.scala16
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala9
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala12
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala4
4 files changed, 18 insertions, 23 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index 92d4132e68..c24c65be2a 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -111,8 +111,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}
-@serializable
-class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] {
+class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] with Serializable {
def createCombiner(msg: M): ArrayBuffer[M] =
ArrayBuffer(msg)
def mergeMsg(combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] =
@@ -121,8 +120,7 @@ class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] {
a ++= b
}
-@serializable
-class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
+class NullAggregator[V] extends Aggregator[V, Option[Nothing]] with Serializable {
def createAggregator(vert: V): Option[Nothing] = None
def mergeAggregators(a: Option[Nothing], b: Option[Nothing]): Option[Nothing] = None
}
@@ -130,8 +128,8 @@ class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
/**
* Represents a Bagel vertex.
*
- * Subclasses may store state along with each vertex and must be
- * annotated with @serializable.
+ * Subclasses may store state along with each vertex and must
+ * inherit from java.io.Serializable or scala.Serializable.
*/
trait Vertex {
def id: String
@@ -142,7 +140,7 @@ trait Vertex {
* Represents a Bagel message to a target vertex.
*
* Subclasses may contain a payload to deliver to the target vertex
- * and must be annotated with @serializable.
+ * and must inherit from java.io.Serializable or scala.Serializable.
*/
trait Message {
def targetId: String
@@ -151,8 +149,8 @@ trait Message {
/**
* Represents a directed edge between two vertices.
*
- * Subclasses may store state along each edge and must be annotated
- * with @serializable.
+ * Subclasses may store state along each edge and must inherit from
+ * java.io.Serializable or scala.Serializable.
*/
trait Edge {
def targetId: String
diff --git a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala
index a7fd386310..691fc55b78 100644
--- a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala
@@ -81,8 +81,7 @@ object ShortestPath {
}
}
-@serializable
-object MinCombiner extends Combiner[SPMessage, Int] {
+object MinCombiner extends Combiner[SPMessage, Int] with Serializable {
def createCombiner(msg: SPMessage): Int =
msg.value
def mergeMsg(combiner: Int, msg: SPMessage): Int =
@@ -91,6 +90,6 @@ object MinCombiner extends Combiner[SPMessage, Int] {
min(a, b)
}
-@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex
-@serializable class SPEdge(val targetId: String, val value: Int) extends Edge
-@serializable class SPMessage(val targetId: String, val value: Int) extends Message
+class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex with Serializable
+class SPEdge(val targetId: String, val value: Int) extends Edge with Serializable
+class SPMessage(val targetId: String, val value: Int) extends Message with Serializable
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
index 1bce5bebad..9a0dbbe9d7 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
@@ -76,8 +76,7 @@ object WikipediaPageRank {
}
}
-@serializable
-object PRCombiner extends Combiner[PRMessage, Double] {
+object PRCombiner extends Combiner[PRMessage, Double] with Serializable {
def createCombiner(msg: PRMessage): Double =
msg.value
def mergeMsg(combiner: Double, msg: PRMessage): Double =
@@ -105,8 +104,7 @@ object PRCombiner extends Combiner[PRMessage, Double] {
}
}
-@serializable
-object PRNoCombiner extends DefaultCombiner[PRMessage] {
+object PRNoCombiner extends DefaultCombiner[PRMessage] with Serializable {
def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) =
PRCombiner.compute(numVertices, epsilon)(self, messages match {
case Some(msgs) => Some(msgs.map(_.value).sum)
@@ -114,7 +112,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}, superstep)
}
-@serializable class PRVertex() extends Vertex {
+class PRVertex() extends Vertex with Serializable {
var id: String = _
var value: Double = _
var outEdges: ArrayBuffer[PREdge] = _
@@ -129,7 +127,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}
}
-@serializable class PRMessage() extends Message {
+class PRMessage() extends Message with Serializable {
var targetId: String = _
var value: Double = _
@@ -140,7 +138,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}
}
-@serializable class PREdge() extends Edge {
+class PREdge() extends Edge with Serializable {
var targetId: String = _
def this(targetId: String) {
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 9e64d3f136..59356e09f0 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -12,8 +12,8 @@ import spark._
import spark.bagel.Bagel._
-@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex
-@serializable class TestMessage(val targetId: String) extends Message
+class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex with Serializable
+class TestMessage(val targetId: String) extends Message with Serializable
class BagelSuite extends FunSuite with Assertions {
test("halting by voting") {