diff options
Diffstat (limited to 'bagel/src')
4 files changed, 18 insertions, 23 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index 92d4132e68..c24c65be2a 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -111,8 +111,7 @@ trait Aggregator[V, A] { def mergeAggregators(a: A, b: A): A } -@serializable -class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] { +class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] with Serializable { def createCombiner(msg: M): ArrayBuffer[M] = ArrayBuffer(msg) def mergeMsg(combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] = @@ -121,8 +120,7 @@ class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] { a ++= b } -@serializable -class NullAggregator[V] extends Aggregator[V, Option[Nothing]] { +class NullAggregator[V] extends Aggregator[V, Option[Nothing]] with Serializable { def createAggregator(vert: V): Option[Nothing] = None def mergeAggregators(a: Option[Nothing], b: Option[Nothing]): Option[Nothing] = None } @@ -130,8 +128,8 @@ class NullAggregator[V] extends Aggregator[V, Option[Nothing]] { /** * Represents a Bagel vertex. * - * Subclasses may store state along with each vertex and must be - * annotated with @serializable. + * Subclasses may store state along with each vertex and must + * inherit from java.io.Serializable or scala.Serializable. */ trait Vertex { def id: String @@ -142,7 +140,7 @@ trait Vertex { * Represents a Bagel message to a target vertex. * * Subclasses may contain a payload to deliver to the target vertex - * and must be annotated with @serializable. + * and must inherit from java.io.Serializable or scala.Serializable. */ trait Message { def targetId: String @@ -151,8 +149,8 @@ trait Message { /** * Represents a directed edge between two vertices. * - * Subclasses may store state along each edge and must be annotated - * with @serializable. + * Subclasses may store state along each edge and must inherit from + * java.io.Serializable or scala.Serializable. */ trait Edge { def targetId: String diff --git a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala index a7fd386310..691fc55b78 100644 --- a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala +++ b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala @@ -81,8 +81,7 @@ object ShortestPath { } } -@serializable -object MinCombiner extends Combiner[SPMessage, Int] { +object MinCombiner extends Combiner[SPMessage, Int] with Serializable { def createCombiner(msg: SPMessage): Int = msg.value def mergeMsg(combiner: Int, msg: SPMessage): Int = @@ -91,6 +90,6 @@ object MinCombiner extends Combiner[SPMessage, Int] { min(a, b) } -@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex -@serializable class SPEdge(val targetId: String, val value: Int) extends Edge -@serializable class SPMessage(val targetId: String, val value: Int) extends Message +class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex with Serializable +class SPEdge(val targetId: String, val value: Int) extends Edge with Serializable +class SPMessage(val targetId: String, val value: Int) extends Message with Serializable diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala index 1bce5bebad..9a0dbbe9d7 100644 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala +++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala @@ -76,8 +76,7 @@ object WikipediaPageRank { } } -@serializable -object PRCombiner extends Combiner[PRMessage, Double] { +object PRCombiner extends Combiner[PRMessage, Double] with Serializable { def createCombiner(msg: PRMessage): Double = msg.value def mergeMsg(combiner: Double, msg: PRMessage): Double = @@ -105,8 +104,7 @@ object PRCombiner extends Combiner[PRMessage, Double] { } } -@serializable -object PRNoCombiner extends DefaultCombiner[PRMessage] { +object PRNoCombiner extends DefaultCombiner[PRMessage] with Serializable { def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) = PRCombiner.compute(numVertices, epsilon)(self, messages match { case Some(msgs) => Some(msgs.map(_.value).sum) @@ -114,7 +112,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { }, superstep) } -@serializable class PRVertex() extends Vertex { +class PRVertex() extends Vertex with Serializable { var id: String = _ var value: Double = _ var outEdges: ArrayBuffer[PREdge] = _ @@ -129,7 +127,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { } } -@serializable class PRMessage() extends Message { +class PRMessage() extends Message with Serializable { var targetId: String = _ var value: Double = _ @@ -140,7 +138,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { } } -@serializable class PREdge() extends Edge { +class PREdge() extends Edge with Serializable { var targetId: String = _ def this(targetId: String) { diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 9e64d3f136..59356e09f0 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -12,8 +12,8 @@ import spark._ import spark.bagel.Bagel._ -@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex -@serializable class TestMessage(val targetId: String) extends Message +class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex with Serializable +class TestMessage(val targetId: String) extends Message with Serializable class BagelSuite extends FunSuite with Assertions { test("halting by voting") { |