diff options
author | Ismael Juma <ismael@juma.me.uk> | 2011-08-02 10:16:33 +0100 |
---|---|---|
committer | Ismael Juma <ismael@juma.me.uk> | 2011-08-02 10:16:33 +0100 |
commit | 0fba22b3d216548e5e47a23a1b2e84e0e46835e9 (patch) | |
tree | 1be71e2dcaac4d58d36650ee24c86f49521f3e0a | |
parent | 2e57338896b6a926ed0e32b88bdfa02333d1cc9f (diff) | |
download | spark-0fba22b3d216548e5e47a23a1b2e84e0e46835e9.tar.gz spark-0fba22b3d216548e5e47a23a1b2e84e0e46835e9.tar.bz2 spark-0fba22b3d216548e5e47a23a1b2e84e0e46835e9.zip |
Fix issue #65: Change @serializable to extends Serializable in 2.9 branch
Note that we use scala.Serializable introduced in Scala 2.9 instead of
java.io.Serializable. Also, case classes inherit from scala.Serializable by
default.
30 files changed, 58 insertions, 90 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index 92d4132e68..c24c65be2a 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -111,8 +111,7 @@ trait Aggregator[V, A] { def mergeAggregators(a: A, b: A): A } -@serializable -class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] { +class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] with Serializable { def createCombiner(msg: M): ArrayBuffer[M] = ArrayBuffer(msg) def mergeMsg(combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] = @@ -121,8 +120,7 @@ class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] { a ++= b } -@serializable -class NullAggregator[V] extends Aggregator[V, Option[Nothing]] { +class NullAggregator[V] extends Aggregator[V, Option[Nothing]] with Serializable { def createAggregator(vert: V): Option[Nothing] = None def mergeAggregators(a: Option[Nothing], b: Option[Nothing]): Option[Nothing] = None } @@ -130,8 +128,8 @@ class NullAggregator[V] extends Aggregator[V, Option[Nothing]] { /** * Represents a Bagel vertex. * - * Subclasses may store state along with each vertex and must be - * annotated with @serializable. + * Subclasses may store state along with each vertex and must + * inherit from java.io.Serializable or scala.Serializable. */ trait Vertex { def id: String @@ -142,7 +140,7 @@ trait Vertex { * Represents a Bagel message to a target vertex. * * Subclasses may contain a payload to deliver to the target vertex - * and must be annotated with @serializable. + * and must inherit from java.io.Serializable or scala.Serializable. */ trait Message { def targetId: String @@ -151,8 +149,8 @@ trait Message { /** * Represents a directed edge between two vertices. * - * Subclasses may store state along each edge and must be annotated - * with @serializable. + * Subclasses may store state along each edge and must inherit from + * java.io.Serializable or scala.Serializable. */ trait Edge { def targetId: String diff --git a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala index a7fd386310..691fc55b78 100644 --- a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala +++ b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala @@ -81,8 +81,7 @@ object ShortestPath { } } -@serializable -object MinCombiner extends Combiner[SPMessage, Int] { +object MinCombiner extends Combiner[SPMessage, Int] with Serializable { def createCombiner(msg: SPMessage): Int = msg.value def mergeMsg(combiner: Int, msg: SPMessage): Int = @@ -91,6 +90,6 @@ object MinCombiner extends Combiner[SPMessage, Int] { min(a, b) } -@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex -@serializable class SPEdge(val targetId: String, val value: Int) extends Edge -@serializable class SPMessage(val targetId: String, val value: Int) extends Message +class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex with Serializable +class SPEdge(val targetId: String, val value: Int) extends Edge with Serializable +class SPMessage(val targetId: String, val value: Int) extends Message with Serializable diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala index 1bce5bebad..9a0dbbe9d7 100644 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala +++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala @@ -76,8 +76,7 @@ object WikipediaPageRank { } } -@serializable -object PRCombiner extends Combiner[PRMessage, Double] { +object PRCombiner extends Combiner[PRMessage, Double] with Serializable { def createCombiner(msg: PRMessage): Double = msg.value def mergeMsg(combiner: Double, msg: PRMessage): Double = @@ -105,8 +104,7 @@ object PRCombiner extends Combiner[PRMessage, Double] { } } -@serializable -object PRNoCombiner extends DefaultCombiner[PRMessage] { +object PRNoCombiner extends DefaultCombiner[PRMessage] with Serializable { def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) = PRCombiner.compute(numVertices, epsilon)(self, messages match { case Some(msgs) => Some(msgs.map(_.value).sum) @@ -114,7 +112,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { }, superstep) } -@serializable class PRVertex() extends Vertex { +class PRVertex() extends Vertex with Serializable { var id: String = _ var value: Double = _ var outEdges: ArrayBuffer[PREdge] = _ @@ -129,7 +127,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { } } -@serializable class PRMessage() extends Message { +class PRMessage() extends Message with Serializable { var targetId: String = _ var value: Double = _ @@ -140,7 +138,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { } } -@serializable class PREdge() extends Edge { +class PREdge() extends Edge with Serializable { var targetId: String = _ def this(targetId: String) { diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 9e64d3f136..59356e09f0 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -12,8 +12,8 @@ import spark._ import spark.bagel.Bagel._ -@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex -@serializable class TestMessage(val targetId: String) extends Message +class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex with Serializable +class TestMessage(val targetId: String) extends Message with Serializable class BagelSuite extends FunSuite with Assertions { test("halting by voting") { diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index 4f51826d9d..a808536146 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -4,8 +4,8 @@ import java.io._ import scala.collection.mutable.Map -@serializable class Accumulator[T]( - @transient initialValue: T, param: AccumulatorParam[T]) +class Accumulator[T] ( + @transient initialValue: T, param: AccumulatorParam[T]) extends Serializable { val id = Accumulators.newId @transient var value_ = initialValue // Current value on master @@ -32,7 +32,7 @@ import scala.collection.mutable.Map override def toString = value_.toString } -@serializable trait AccumulatorParam[T] { +trait AccumulatorParam[T] extends Serializable { def addInPlace(t1: T, t2: T): T def zero(initialValue: T): T } diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 87453c9c15..36e70f7403 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -1,8 +1,7 @@ package spark -@serializable class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, val mergeCombiners: (C, C) => C -)
\ No newline at end of file +) extends Serializable
\ No newline at end of file diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala index 42a9b3b23c..df822b3552 100644 --- a/core/src/main/scala/spark/CartesianRDD.scala +++ b/core/src/main/scala/spark/CartesianRDD.scala @@ -1,14 +1,13 @@ package spark -@serializable class CartesianSplit(idx: Int, val s1: Split, val s2: Split) -extends Split { +class CartesianSplit(idx: Int, val s1: Split, val s2: Split) +extends Split with Serializable { override val index = idx } -@serializable class CartesianRDD[T: ClassManifest, U:ClassManifest]( sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U]) -extends RDD[Pair[T, U]](sc) { +extends RDD[Pair[T, U]](sc) with Serializable { val numSplitsInRdd2 = rdd2.splits.size @transient val splits_ = { diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index a159ca1534..4a8fa6d3fc 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -6,24 +6,21 @@ import java.io.ObjectInputStream import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -@serializable -sealed trait CoGroupSplitDep +sealed trait CoGroupSplitDep extends Serializable case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep -@serializable class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) -extends Split { +extends Split with Serializable { override val index = idx override def hashCode(): Int = idx } -@serializable class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] ( { x => ArrayBuffer(x) }, { (b, x) => b += x }, { (b1, b2) => b1 ++ b2 } -) +) with Serializable class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index c83736a424..bd20634fb9 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -1,7 +1,6 @@ package spark -@serializable -abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) +abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) { diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala index c87fa844c3..47286e0a65 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/HadoopRDD.scala @@ -13,8 +13,8 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils /** A Spark split class that wraps around a Hadoop InputSplit */ -@serializable class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) -extends Split { +class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) +extends Split with Serializable { val inputSplit = new SerializableWritable[InputSplit](s) override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index ae421a243e..73c8876eb6 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -20,7 +20,7 @@ import spark.Logging * also contain an output key class, an output value class, a filename to write to, etc * exactly like in a Hadoop job. */ -@serializable class HadoopWriter(@transient jobConf: JobConf) extends Logging { +class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 71936eda02..73827deeda 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -30,8 +30,7 @@ import SparkContext._ /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. */ -@serializable -class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging { +class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging with Serializable { def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = { for ((k, v) <- m2) { diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala index a2e271b028..b45f29091b 100644 --- a/core/src/main/scala/spark/ParallelCollection.scala +++ b/core/src/main/scala/spark/ParallelCollection.scala @@ -2,9 +2,9 @@ package spark import java.util.concurrent.atomic.AtomicLong -@serializable class ParallelCollectionSplit[T: ClassManifest]( +class ParallelCollectionSplit[T: ClassManifest]( val rddId: Long, val slice: Int, values: Seq[T]) -extends Split { +extends Split with Serializable { def iterator(): Iterator[T] = values.iterator override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 92057604da..4491de1734 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -1,7 +1,6 @@ package spark -@serializable -abstract class Partitioner { +abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index a0c4e29771..445d520bc2 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -44,8 +44,7 @@ import SparkContext._ * In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, * and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles. */ -@serializable -abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { +abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable { // Methods that must be implemented by subclasses def splits: Array[Split] def compute(split: Split): Iterator[T] diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala index 2eeafedcdd..21c1148b63 100644 --- a/core/src/main/scala/spark/SampledRDD.scala +++ b/core/src/main/scala/spark/SampledRDD.scala @@ -2,7 +2,7 @@ package spark import java.util.Random -@serializable class SampledRDDSplit(val prev: Split, val seed: Int) extends Split { +class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { override val index = prev.index } diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 6ca0556d9f..bd4a526b89 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -31,8 +31,7 @@ import SparkContext._ * through an implicit conversion. Note that this can't be part of PairRDDFunctions because * we need more implicit parameters to convert our keys and values to Writable. */ -@serializable -class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging { +class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging with Serializable { def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) diff --git a/core/src/main/scala/spark/SerializableWritable.scala b/core/src/main/scala/spark/SerializableWritable.scala index ae393d06d3..8306fbf570 100644 --- a/core/src/main/scala/spark/SerializableWritable.scala +++ b/core/src/main/scala/spark/SerializableWritable.scala @@ -6,8 +6,7 @@ import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.JobConf -@serializable -class SerializableWritable[T <: Writable](@transient var t: T) { +class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable { def value = t override def toString = t.toString diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 18bd5c8817..dbbe7a63d5 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -363,5 +363,4 @@ object SparkContext { * that doesn't know the type of T when it is created. This sounds strange but is necessary to * support converting subclasses of Writable to themselves (writableWritableConverter). */ -@serializable -class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) {} +class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable diff --git a/core/src/main/scala/spark/Split.scala b/core/src/main/scala/spark/Split.scala index 62bb5f82c5..831f7672e6 100644 --- a/core/src/main/scala/spark/Split.scala +++ b/core/src/main/scala/spark/Split.scala @@ -3,7 +3,7 @@ package spark /** * A partition of an RDD. */ -@serializable trait Split { +trait Split extends Serializable { /** * Get the split's index within its parent RDD */ diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala index 03274167e1..c34083416f 100644 --- a/core/src/main/scala/spark/Task.scala +++ b/core/src/main/scala/spark/Task.scala @@ -1,11 +1,8 @@ package spark -@serializable -class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) { -} +class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) extends Serializable -@serializable -abstract class Task[T] { +abstract class Task[T] extends Serializable { def run (id: Int): T def preferredLocations: Seq[String] = Nil def generation: Option[Long] = None diff --git a/core/src/main/scala/spark/TaskResult.scala b/core/src/main/scala/spark/TaskResult.scala index db33c9ff44..2b7fd1a4b2 100644 --- a/core/src/main/scala/spark/TaskResult.scala +++ b/core/src/main/scala/spark/TaskResult.scala @@ -5,5 +5,4 @@ import scala.collection.mutable.Map // Task result. Also contains updates to accumulator variables. // TODO: Use of distributed cache to return result is a hack to get around // what seems to be a bug with messages over 60KB in libprocess; fix it -@serializable -private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any]) +private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any]) extends Serializable diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala index 78297be4f3..dadfd94eef 100644 --- a/core/src/main/scala/spark/UnionRDD.scala +++ b/core/src/main/scala/spark/UnionRDD.scala @@ -2,17 +2,15 @@ package spark import scala.collection.mutable.ArrayBuffer -@serializable class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split) -extends Split { +extends Split with Serializable { def iterator() = rdd.iterator(split) def preferredLocations() = rdd.preferredLocations(split) override val index = idx } -@serializable class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]]) -extends RDD[T](sc) { +extends RDD[T](sc) with Serializable { @transient val splits_ : Array[Split] = { val array = new Array[Split](rdds.map(_.splits.size).sum) var pos = 0 diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 220456a210..6960339bf8 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -10,9 +10,8 @@ import scala.math import spark._ -@serializable class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging { +extends Broadcast[T] with Logging with Serializable { def value = value_ diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index f39fb9de69..837129c665 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -7,8 +7,7 @@ import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} import spark._ -@serializable -trait Broadcast[T] { +trait Broadcast[T] extends Serializable { val uuid = UUID.randomUUID def value: T @@ -20,7 +19,7 @@ trait Broadcast[T] { } object Broadcast -extends Logging { +extends Logging with Serializable { // Messages val REGISTER_BROADCAST_TRACKER = 0 val UNREGISTER_BROADCAST_TRACKER = 1 @@ -191,18 +190,15 @@ extends Logging { } } -@serializable -case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { } +case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) extends Serializable -@serializable case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock], val totalBlocks: Int, - val totalBytes: Int) { + val totalBytes: Int) extends Serializable { @transient var hasBlocks = 0 } -@serializable -class SpeedTracker { +class SpeedTracker extends Serializable { // Mapping 'source' to '(totalTime, numBlocks)' private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] () diff --git a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala index 3afe923bae..e33ef78e8a 100644 --- a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala @@ -9,9 +9,8 @@ import scala.math import spark._ -@serializable class ChainedBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging { +extends Broadcast[T] with Logging with Serializable { def value = value_ diff --git a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala index e541c09216..076f18afac 100644 --- a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala @@ -11,9 +11,8 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} import spark._ -@serializable -class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging { +class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean) +extends Broadcast[T] with Logging with Serializable { def value = value_ diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala index 064142590a..03f928953d 100644 --- a/core/src/main/scala/spark/broadcast/SourceInfo.scala +++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala @@ -10,7 +10,6 @@ import spark._ * CHANGED: Keep track of the blockSize for THIS broadcast variable. * Broadcast.BlockSize is expected to be updated across different broadcasts */ -@serializable case class SourceInfo (val hostAddress: String, val listenPort: Int, val totalBlocks: Int = SourceInfo.UnusedParam, diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index 79dcd317ec..945d8cd8a4 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -9,9 +9,8 @@ import scala.math import spark._ -@serializable class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging { +extends Broadcast[T] with Logging with Serializable { def value = value_ diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/examples/src/main/scala/spark/examples/Vector.scala index ea70626e71..dd34dffee5 100644 --- a/examples/src/main/scala/spark/examples/Vector.scala +++ b/examples/src/main/scala/spark/examples/Vector.scala @@ -1,6 +1,6 @@ package spark.examples -@serializable class Vector(val elements: Array[Double]) { +class Vector(val elements: Array[Double]) extends Serializable { def length = elements.length def apply(index: Int) = elements(index) |