aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/main/scala/spark/bagel/Bagel.scala16
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala9
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala12
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala4
-rw-r--r--core/src/main/scala/spark/Accumulators.scala6
-rw-r--r--core/src/main/scala/spark/Aggregator.scala3
-rw-r--r--core/src/main/scala/spark/CartesianRDD.scala7
-rw-r--r--core/src/main/scala/spark/CoGroupedRDD.scala9
-rw-r--r--core/src/main/scala/spark/Dependency.scala3
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala4
-rw-r--r--core/src/main/scala/spark/HadoopWriter.scala2
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala3
-rw-r--r--core/src/main/scala/spark/ParallelCollection.scala4
-rw-r--r--core/src/main/scala/spark/Partitioner.scala3
-rw-r--r--core/src/main/scala/spark/RDD.scala3
-rw-r--r--core/src/main/scala/spark/SampledRDD.scala2
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala3
-rw-r--r--core/src/main/scala/spark/SerializableWritable.scala3
-rw-r--r--core/src/main/scala/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/spark/Split.scala2
-rw-r--r--core/src/main/scala/spark/Task.scala7
-rw-r--r--core/src/main/scala/spark/TaskResult.scala3
-rw-r--r--core/src/main/scala/spark/UnionRDD.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala3
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala14
-rw-r--r--core/src/main/scala/spark/broadcast/ChainedBroadcast.scala3
-rw-r--r--core/src/main/scala/spark/broadcast/DfsBroadcast.scala5
-rw-r--r--core/src/main/scala/spark/broadcast/SourceInfo.scala1
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala3
-rw-r--r--examples/src/main/scala/spark/examples/Vector.scala2
30 files changed, 58 insertions, 90 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index 92d4132e68..c24c65be2a 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -111,8 +111,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}
-@serializable
-class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] {
+class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] with Serializable {
def createCombiner(msg: M): ArrayBuffer[M] =
ArrayBuffer(msg)
def mergeMsg(combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] =
@@ -121,8 +120,7 @@ class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] {
a ++= b
}
-@serializable
-class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
+class NullAggregator[V] extends Aggregator[V, Option[Nothing]] with Serializable {
def createAggregator(vert: V): Option[Nothing] = None
def mergeAggregators(a: Option[Nothing], b: Option[Nothing]): Option[Nothing] = None
}
@@ -130,8 +128,8 @@ class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
/**
* Represents a Bagel vertex.
*
- * Subclasses may store state along with each vertex and must be
- * annotated with @serializable.
+ * Subclasses may store state along with each vertex and must
+ * inherit from java.io.Serializable or scala.Serializable.
*/
trait Vertex {
def id: String
@@ -142,7 +140,7 @@ trait Vertex {
* Represents a Bagel message to a target vertex.
*
* Subclasses may contain a payload to deliver to the target vertex
- * and must be annotated with @serializable.
+ * and must inherit from java.io.Serializable or scala.Serializable.
*/
trait Message {
def targetId: String
@@ -151,8 +149,8 @@ trait Message {
/**
* Represents a directed edge between two vertices.
*
- * Subclasses may store state along each edge and must be annotated
- * with @serializable.
+ * Subclasses may store state along each edge and must inherit from
+ * java.io.Serializable or scala.Serializable.
*/
trait Edge {
def targetId: String
diff --git a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala
index a7fd386310..691fc55b78 100644
--- a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala
@@ -81,8 +81,7 @@ object ShortestPath {
}
}
-@serializable
-object MinCombiner extends Combiner[SPMessage, Int] {
+object MinCombiner extends Combiner[SPMessage, Int] with Serializable {
def createCombiner(msg: SPMessage): Int =
msg.value
def mergeMsg(combiner: Int, msg: SPMessage): Int =
@@ -91,6 +90,6 @@ object MinCombiner extends Combiner[SPMessage, Int] {
min(a, b)
}
-@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex
-@serializable class SPEdge(val targetId: String, val value: Int) extends Edge
-@serializable class SPMessage(val targetId: String, val value: Int) extends Message
+class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex with Serializable
+class SPEdge(val targetId: String, val value: Int) extends Edge with Serializable
+class SPMessage(val targetId: String, val value: Int) extends Message with Serializable
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
index 1bce5bebad..9a0dbbe9d7 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
@@ -76,8 +76,7 @@ object WikipediaPageRank {
}
}
-@serializable
-object PRCombiner extends Combiner[PRMessage, Double] {
+object PRCombiner extends Combiner[PRMessage, Double] with Serializable {
def createCombiner(msg: PRMessage): Double =
msg.value
def mergeMsg(combiner: Double, msg: PRMessage): Double =
@@ -105,8 +104,7 @@ object PRCombiner extends Combiner[PRMessage, Double] {
}
}
-@serializable
-object PRNoCombiner extends DefaultCombiner[PRMessage] {
+object PRNoCombiner extends DefaultCombiner[PRMessage] with Serializable {
def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) =
PRCombiner.compute(numVertices, epsilon)(self, messages match {
case Some(msgs) => Some(msgs.map(_.value).sum)
@@ -114,7 +112,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}, superstep)
}
-@serializable class PRVertex() extends Vertex {
+class PRVertex() extends Vertex with Serializable {
var id: String = _
var value: Double = _
var outEdges: ArrayBuffer[PREdge] = _
@@ -129,7 +127,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}
}
-@serializable class PRMessage() extends Message {
+class PRMessage() extends Message with Serializable {
var targetId: String = _
var value: Double = _
@@ -140,7 +138,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}
}
-@serializable class PREdge() extends Edge {
+class PREdge() extends Edge with Serializable {
var targetId: String = _
def this(targetId: String) {
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 9e64d3f136..59356e09f0 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -12,8 +12,8 @@ import spark._
import spark.bagel.Bagel._
-@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex
-@serializable class TestMessage(val targetId: String) extends Message
+class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex with Serializable
+class TestMessage(val targetId: String) extends Message with Serializable
class BagelSuite extends FunSuite with Assertions {
test("halting by voting") {
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index 4f51826d9d..a808536146 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -4,8 +4,8 @@ import java.io._
import scala.collection.mutable.Map
-@serializable class Accumulator[T](
- @transient initialValue: T, param: AccumulatorParam[T])
+class Accumulator[T] (
+ @transient initialValue: T, param: AccumulatorParam[T]) extends Serializable
{
val id = Accumulators.newId
@transient var value_ = initialValue // Current value on master
@@ -32,7 +32,7 @@ import scala.collection.mutable.Map
override def toString = value_.toString
}
-@serializable trait AccumulatorParam[T] {
+trait AccumulatorParam[T] extends Serializable {
def addInPlace(t1: T, t2: T): T
def zero(initialValue: T): T
}
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 87453c9c15..36e70f7403 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -1,8 +1,7 @@
package spark
-@serializable
class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C
-) \ No newline at end of file
+) extends Serializable \ No newline at end of file
diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala
index 42a9b3b23c..df822b3552 100644
--- a/core/src/main/scala/spark/CartesianRDD.scala
+++ b/core/src/main/scala/spark/CartesianRDD.scala
@@ -1,14 +1,13 @@
package spark
-@serializable class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
-extends Split {
+class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
+extends Split with Serializable {
override val index = idx
}
-@serializable
class CartesianRDD[T: ClassManifest, U:ClassManifest](
sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U])
-extends RDD[Pair[T, U]](sc) {
+extends RDD[Pair[T, U]](sc) with Serializable {
val numSplitsInRdd2 = rdd2.splits.size
@transient val splits_ = {
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala
index a159ca1534..4a8fa6d3fc 100644
--- a/core/src/main/scala/spark/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/CoGroupedRDD.scala
@@ -6,24 +6,21 @@ import java.io.ObjectInputStream
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-@serializable
-sealed trait CoGroupSplitDep
+sealed trait CoGroupSplitDep extends Serializable
case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-@serializable
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep])
-extends Split {
+extends Split with Serializable {
override val index = idx
override def hashCode(): Int = idx
}
-@serializable
class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] (
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
{ (b1, b2) => b1 ++ b2 }
-)
+) with Serializable
class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index c83736a424..bd20634fb9 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -1,7 +1,6 @@
package spark
-@serializable
-abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
+abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable
abstract class NarrowDependency[T](rdd: RDD[T])
extends Dependency(rdd, false) {
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index c87fa844c3..47286e0a65 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -13,8 +13,8 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
/** A Spark split class that wraps around a Hadoop InputSplit */
-@serializable class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
-extends Split {
+class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
+extends Split with Serializable {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index ae421a243e..73c8876eb6 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -20,7 +20,7 @@ import spark.Logging
* also contain an output key class, an output value class, a filename to write to, etc
* exactly like in a Hadoop job.
*/
-@serializable class HadoopWriter(@transient jobConf: JobConf) extends Logging {
+class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable {
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 71936eda02..73827deeda 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -30,8 +30,7 @@ import SparkContext._
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
*/
-@serializable
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging {
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging with Serializable {
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
for ((k, v) <- m2) {
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index a2e271b028..b45f29091b 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -2,9 +2,9 @@ package spark
import java.util.concurrent.atomic.AtomicLong
-@serializable class ParallelCollectionSplit[T: ClassManifest](
+class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long, val slice: Int, values: Seq[T])
-extends Split {
+extends Split with Serializable {
def iterator(): Iterator[T] = values.iterator
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 92057604da..4491de1734 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -1,7 +1,6 @@
package spark
-@serializable
-abstract class Partitioner {
+abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index a0c4e29771..445d520bc2 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -44,8 +44,7 @@ import SparkContext._
* In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs,
* and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles.
*/
-@serializable
-abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
+abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable {
// Methods that must be implemented by subclasses
def splits: Array[Split]
def compute(split: Split): Iterator[T]
diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala
index 2eeafedcdd..21c1148b63 100644
--- a/core/src/main/scala/spark/SampledRDD.scala
+++ b/core/src/main/scala/spark/SampledRDD.scala
@@ -2,7 +2,7 @@ package spark
import java.util.Random
-@serializable class SampledRDDSplit(val prev: Split, val seed: Int) extends Split {
+class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index = prev.index
}
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 6ca0556d9f..bd4a526b89 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -31,8 +31,7 @@ import SparkContext._
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
*/
-@serializable
-class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging {
+class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging with Serializable {
def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))
diff --git a/core/src/main/scala/spark/SerializableWritable.scala b/core/src/main/scala/spark/SerializableWritable.scala
index ae393d06d3..8306fbf570 100644
--- a/core/src/main/scala/spark/SerializableWritable.scala
+++ b/core/src/main/scala/spark/SerializableWritable.scala
@@ -6,8 +6,7 @@ import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.JobConf
-@serializable
-class SerializableWritable[T <: Writable](@transient var t: T) {
+class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 18bd5c8817..dbbe7a63d5 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -363,5 +363,4 @@ object SparkContext {
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
-@serializable
-class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) {}
+class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable
diff --git a/core/src/main/scala/spark/Split.scala b/core/src/main/scala/spark/Split.scala
index 62bb5f82c5..831f7672e6 100644
--- a/core/src/main/scala/spark/Split.scala
+++ b/core/src/main/scala/spark/Split.scala
@@ -3,7 +3,7 @@ package spark
/**
* A partition of an RDD.
*/
-@serializable trait Split {
+trait Split extends Serializable {
/**
* Get the split's index within its parent RDD
*/
diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala
index 03274167e1..c34083416f 100644
--- a/core/src/main/scala/spark/Task.scala
+++ b/core/src/main/scala/spark/Task.scala
@@ -1,11 +1,8 @@
package spark
-@serializable
-class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) {
-}
+class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) extends Serializable
-@serializable
-abstract class Task[T] {
+abstract class Task[T] extends Serializable {
def run (id: Int): T
def preferredLocations: Seq[String] = Nil
def generation: Option[Long] = None
diff --git a/core/src/main/scala/spark/TaskResult.scala b/core/src/main/scala/spark/TaskResult.scala
index db33c9ff44..2b7fd1a4b2 100644
--- a/core/src/main/scala/spark/TaskResult.scala
+++ b/core/src/main/scala/spark/TaskResult.scala
@@ -5,5 +5,4 @@ import scala.collection.mutable.Map
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
-@serializable
-private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any])
+private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any]) extends Serializable
diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala
index 78297be4f3..dadfd94eef 100644
--- a/core/src/main/scala/spark/UnionRDD.scala
+++ b/core/src/main/scala/spark/UnionRDD.scala
@@ -2,17 +2,15 @@ package spark
import scala.collection.mutable.ArrayBuffer
-@serializable
class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split)
-extends Split {
+extends Split with Serializable {
def iterator() = rdd.iterator(split)
def preferredLocations() = rdd.preferredLocations(split)
override val index = idx
}
-@serializable
class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
-extends RDD[T](sc) {
+extends RDD[T](sc) with Serializable {
@transient val splits_ : Array[Split] = {
val array = new Array[Split](rdds.map(_.splits.size).sum)
var pos = 0
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 220456a210..6960339bf8 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -10,9 +10,8 @@ import scala.math
import spark._
-@serializable
class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean)
-extends Broadcast[T] with Logging {
+extends Broadcast[T] with Logging with Serializable {
def value = value_
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index f39fb9de69..837129c665 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -7,8 +7,7 @@ import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import spark._
-@serializable
-trait Broadcast[T] {
+trait Broadcast[T] extends Serializable {
val uuid = UUID.randomUUID
def value: T
@@ -20,7 +19,7 @@ trait Broadcast[T] {
}
object Broadcast
-extends Logging {
+extends Logging with Serializable {
// Messages
val REGISTER_BROADCAST_TRACKER = 0
val UNREGISTER_BROADCAST_TRACKER = 1
@@ -191,18 +190,15 @@ extends Logging {
}
}
-@serializable
-case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { }
+case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) extends Serializable
-@serializable
case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock],
val totalBlocks: Int,
- val totalBytes: Int) {
+ val totalBytes: Int) extends Serializable {
@transient var hasBlocks = 0
}
-@serializable
-class SpeedTracker {
+class SpeedTracker extends Serializable {
// Mapping 'source' to '(totalTime, numBlocks)'
private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] ()
diff --git a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
index 3afe923bae..e33ef78e8a 100644
--- a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
@@ -9,9 +9,8 @@ import scala.math
import spark._
-@serializable
class ChainedBroadcast[T](@transient var value_ : T, isLocal: Boolean)
-extends Broadcast[T] with Logging {
+extends Broadcast[T] with Logging with Serializable {
def value = value_
diff --git a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala
index e541c09216..076f18afac 100644
--- a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala
@@ -11,9 +11,8 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import spark._
-@serializable
-class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean)
-extends Broadcast[T] with Logging {
+class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean)
+extends Broadcast[T] with Logging with Serializable {
def value = value_
diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala
index 064142590a..03f928953d 100644
--- a/core/src/main/scala/spark/broadcast/SourceInfo.scala
+++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala
@@ -10,7 +10,6 @@ import spark._
* CHANGED: Keep track of the blockSize for THIS broadcast variable.
* Broadcast.BlockSize is expected to be updated across different broadcasts
*/
-@serializable
case class SourceInfo (val hostAddress: String,
val listenPort: Int,
val totalBlocks: Int = SourceInfo.UnusedParam,
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index 79dcd317ec..945d8cd8a4 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -9,9 +9,8 @@ import scala.math
import spark._
-@serializable
class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean)
-extends Broadcast[T] with Logging {
+extends Broadcast[T] with Logging with Serializable {
def value = value_
diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/examples/src/main/scala/spark/examples/Vector.scala
index ea70626e71..dd34dffee5 100644
--- a/examples/src/main/scala/spark/examples/Vector.scala
+++ b/examples/src/main/scala/spark/examples/Vector.scala
@@ -1,6 +1,6 @@
package spark.examples
-@serializable class Vector(val elements: Array[Double]) {
+class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length
def apply(index: Int) = elements(index)