From 0fba22b3d216548e5e47a23a1b2e84e0e46835e9 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 2 Aug 2011 10:16:33 +0100 Subject: Fix issue #65: Change @serializable to extends Serializable in 2.9 branch Note that we use scala.Serializable introduced in Scala 2.9 instead of java.io.Serializable. Also, case classes inherit from scala.Serializable by default. --- bagel/src/main/scala/spark/bagel/Bagel.scala | 16 +++++++--------- .../main/scala/spark/bagel/examples/ShortestPath.scala | 9 ++++----- .../scala/spark/bagel/examples/WikipediaPageRank.scala | 12 +++++------- bagel/src/test/scala/bagel/BagelSuite.scala | 4 ++-- core/src/main/scala/spark/Accumulators.scala | 6 +++--- core/src/main/scala/spark/Aggregator.scala | 3 +-- core/src/main/scala/spark/CartesianRDD.scala | 7 +++---- core/src/main/scala/spark/CoGroupedRDD.scala | 9 +++------ core/src/main/scala/spark/Dependency.scala | 3 +-- core/src/main/scala/spark/HadoopRDD.scala | 4 ++-- core/src/main/scala/spark/HadoopWriter.scala | 2 +- core/src/main/scala/spark/PairRDDFunctions.scala | 3 +-- core/src/main/scala/spark/ParallelCollection.scala | 4 ++-- core/src/main/scala/spark/Partitioner.scala | 3 +-- core/src/main/scala/spark/RDD.scala | 3 +-- core/src/main/scala/spark/SampledRDD.scala | 2 +- core/src/main/scala/spark/SequenceFileRDDFunctions.scala | 3 +-- core/src/main/scala/spark/SerializableWritable.scala | 3 +-- core/src/main/scala/spark/SparkContext.scala | 3 +-- core/src/main/scala/spark/Split.scala | 2 +- core/src/main/scala/spark/Task.scala | 7 ++----- core/src/main/scala/spark/TaskResult.scala | 3 +-- core/src/main/scala/spark/UnionRDD.scala | 6 ++---- .../main/scala/spark/broadcast/BitTorrentBroadcast.scala | 3 +-- core/src/main/scala/spark/broadcast/Broadcast.scala | 14 +++++--------- .../main/scala/spark/broadcast/ChainedBroadcast.scala | 3 +-- core/src/main/scala/spark/broadcast/DfsBroadcast.scala | 5 ++--- core/src/main/scala/spark/broadcast/SourceInfo.scala | 1 - core/src/main/scala/spark/broadcast/TreeBroadcast.scala | 3 +-- examples/src/main/scala/spark/examples/Vector.scala | 2 +- 30 files changed, 58 insertions(+), 90 deletions(-) diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index 92d4132e68..c24c65be2a 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -111,8 +111,7 @@ trait Aggregator[V, A] { def mergeAggregators(a: A, b: A): A } -@serializable -class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] { +class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] with Serializable { def createCombiner(msg: M): ArrayBuffer[M] = ArrayBuffer(msg) def mergeMsg(combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] = @@ -121,8 +120,7 @@ class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] { a ++= b } -@serializable -class NullAggregator[V] extends Aggregator[V, Option[Nothing]] { +class NullAggregator[V] extends Aggregator[V, Option[Nothing]] with Serializable { def createAggregator(vert: V): Option[Nothing] = None def mergeAggregators(a: Option[Nothing], b: Option[Nothing]): Option[Nothing] = None } @@ -130,8 +128,8 @@ class NullAggregator[V] extends Aggregator[V, Option[Nothing]] { /** * Represents a Bagel vertex. * - * Subclasses may store state along with each vertex and must be - * annotated with @serializable. + * Subclasses may store state along with each vertex and must + * inherit from java.io.Serializable or scala.Serializable. */ trait Vertex { def id: String @@ -142,7 +140,7 @@ trait Vertex { * Represents a Bagel message to a target vertex. * * Subclasses may contain a payload to deliver to the target vertex - * and must be annotated with @serializable. + * and must inherit from java.io.Serializable or scala.Serializable. */ trait Message { def targetId: String @@ -151,8 +149,8 @@ trait Message { /** * Represents a directed edge between two vertices. * - * Subclasses may store state along each edge and must be annotated - * with @serializable. + * Subclasses may store state along each edge and must inherit from + * java.io.Serializable or scala.Serializable. */ trait Edge { def targetId: String diff --git a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala index a7fd386310..691fc55b78 100644 --- a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala +++ b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala @@ -81,8 +81,7 @@ object ShortestPath { } } -@serializable -object MinCombiner extends Combiner[SPMessage, Int] { +object MinCombiner extends Combiner[SPMessage, Int] with Serializable { def createCombiner(msg: SPMessage): Int = msg.value def mergeMsg(combiner: Int, msg: SPMessage): Int = @@ -91,6 +90,6 @@ object MinCombiner extends Combiner[SPMessage, Int] { min(a, b) } -@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex -@serializable class SPEdge(val targetId: String, val value: Int) extends Edge -@serializable class SPMessage(val targetId: String, val value: Int) extends Message +class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex with Serializable +class SPEdge(val targetId: String, val value: Int) extends Edge with Serializable +class SPMessage(val targetId: String, val value: Int) extends Message with Serializable diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala index 1bce5bebad..9a0dbbe9d7 100644 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala +++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala @@ -76,8 +76,7 @@ object WikipediaPageRank { } } -@serializable -object PRCombiner extends Combiner[PRMessage, Double] { +object PRCombiner extends Combiner[PRMessage, Double] with Serializable { def createCombiner(msg: PRMessage): Double = msg.value def mergeMsg(combiner: Double, msg: PRMessage): Double = @@ -105,8 +104,7 @@ object PRCombiner extends Combiner[PRMessage, Double] { } } -@serializable -object PRNoCombiner extends DefaultCombiner[PRMessage] { +object PRNoCombiner extends DefaultCombiner[PRMessage] with Serializable { def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) = PRCombiner.compute(numVertices, epsilon)(self, messages match { case Some(msgs) => Some(msgs.map(_.value).sum) @@ -114,7 +112,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { }, superstep) } -@serializable class PRVertex() extends Vertex { +class PRVertex() extends Vertex with Serializable { var id: String = _ var value: Double = _ var outEdges: ArrayBuffer[PREdge] = _ @@ -129,7 +127,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { } } -@serializable class PRMessage() extends Message { +class PRMessage() extends Message with Serializable { var targetId: String = _ var value: Double = _ @@ -140,7 +138,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] { } } -@serializable class PREdge() extends Edge { +class PREdge() extends Edge with Serializable { var targetId: String = _ def this(targetId: String) { diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 9e64d3f136..59356e09f0 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -12,8 +12,8 @@ import spark._ import spark.bagel.Bagel._ -@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex -@serializable class TestMessage(val targetId: String) extends Message +class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex with Serializable +class TestMessage(val targetId: String) extends Message with Serializable class BagelSuite extends FunSuite with Assertions { test("halting by voting") { diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index 4f51826d9d..a808536146 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -4,8 +4,8 @@ import java.io._ import scala.collection.mutable.Map -@serializable class Accumulator[T]( - @transient initialValue: T, param: AccumulatorParam[T]) +class Accumulator[T] ( + @transient initialValue: T, param: AccumulatorParam[T]) extends Serializable { val id = Accumulators.newId @transient var value_ = initialValue // Current value on master @@ -32,7 +32,7 @@ import scala.collection.mutable.Map override def toString = value_.toString } -@serializable trait AccumulatorParam[T] { +trait AccumulatorParam[T] extends Serializable { def addInPlace(t1: T, t2: T): T def zero(initialValue: T): T } diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 87453c9c15..36e70f7403 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -1,8 +1,7 @@ package spark -@serializable class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, val mergeCombiners: (C, C) => C -) \ No newline at end of file +) extends Serializable \ No newline at end of file diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala index 42a9b3b23c..df822b3552 100644 --- a/core/src/main/scala/spark/CartesianRDD.scala +++ b/core/src/main/scala/spark/CartesianRDD.scala @@ -1,14 +1,13 @@ package spark -@serializable class CartesianSplit(idx: Int, val s1: Split, val s2: Split) -extends Split { +class CartesianSplit(idx: Int, val s1: Split, val s2: Split) +extends Split with Serializable { override val index = idx } -@serializable class CartesianRDD[T: ClassManifest, U:ClassManifest]( sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U]) -extends RDD[Pair[T, U]](sc) { +extends RDD[Pair[T, U]](sc) with Serializable { val numSplitsInRdd2 = rdd2.splits.size @transient val splits_ = { diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index a159ca1534..4a8fa6d3fc 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -6,24 +6,21 @@ import java.io.ObjectInputStream import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -@serializable -sealed trait CoGroupSplitDep +sealed trait CoGroupSplitDep extends Serializable case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep -@serializable class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) -extends Split { +extends Split with Serializable { override val index = idx override def hashCode(): Int = idx } -@serializable class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] ( { x => ArrayBuffer(x) }, { (b, x) => b += x }, { (b1, b2) => b1 ++ b2 } -) +) with Serializable class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index c83736a424..bd20634fb9 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -1,7 +1,6 @@ package spark -@serializable -abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) +abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) { diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala index c87fa844c3..47286e0a65 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/HadoopRDD.scala @@ -13,8 +13,8 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils /** A Spark split class that wraps around a Hadoop InputSplit */ -@serializable class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) -extends Split { +class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) +extends Split with Serializable { val inputSplit = new SerializableWritable[InputSplit](s) override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index ae421a243e..73c8876eb6 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -20,7 +20,7 @@ import spark.Logging * also contain an output key class, an output value class, a filename to write to, etc * exactly like in a Hadoop job. */ -@serializable class HadoopWriter(@transient jobConf: JobConf) extends Logging { +class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 71936eda02..73827deeda 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -30,8 +30,7 @@ import SparkContext._ /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. */ -@serializable -class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging { +class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging with Serializable { def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = { for ((k, v) <- m2) { diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala index a2e271b028..b45f29091b 100644 --- a/core/src/main/scala/spark/ParallelCollection.scala +++ b/core/src/main/scala/spark/ParallelCollection.scala @@ -2,9 +2,9 @@ package spark import java.util.concurrent.atomic.AtomicLong -@serializable class ParallelCollectionSplit[T: ClassManifest]( +class ParallelCollectionSplit[T: ClassManifest]( val rddId: Long, val slice: Int, values: Seq[T]) -extends Split { +extends Split with Serializable { def iterator(): Iterator[T] = values.iterator override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 92057604da..4491de1734 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -1,7 +1,6 @@ package spark -@serializable -abstract class Partitioner { +abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index a0c4e29771..445d520bc2 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -44,8 +44,7 @@ import SparkContext._ * In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, * and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles. */ -@serializable -abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { +abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable { // Methods that must be implemented by subclasses def splits: Array[Split] def compute(split: Split): Iterator[T] diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala index 2eeafedcdd..21c1148b63 100644 --- a/core/src/main/scala/spark/SampledRDD.scala +++ b/core/src/main/scala/spark/SampledRDD.scala @@ -2,7 +2,7 @@ package spark import java.util.Random -@serializable class SampledRDDSplit(val prev: Split, val seed: Int) extends Split { +class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { override val index = prev.index } diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 6ca0556d9f..bd4a526b89 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -31,8 +31,7 @@ import SparkContext._ * through an implicit conversion. Note that this can't be part of PairRDDFunctions because * we need more implicit parameters to convert our keys and values to Writable. */ -@serializable -class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging { +class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging with Serializable { def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) diff --git a/core/src/main/scala/spark/SerializableWritable.scala b/core/src/main/scala/spark/SerializableWritable.scala index ae393d06d3..8306fbf570 100644 --- a/core/src/main/scala/spark/SerializableWritable.scala +++ b/core/src/main/scala/spark/SerializableWritable.scala @@ -6,8 +6,7 @@ import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.JobConf -@serializable -class SerializableWritable[T <: Writable](@transient var t: T) { +class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable { def value = t override def toString = t.toString diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 18bd5c8817..dbbe7a63d5 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -363,5 +363,4 @@ object SparkContext { * that doesn't know the type of T when it is created. This sounds strange but is necessary to * support converting subclasses of Writable to themselves (writableWritableConverter). */ -@serializable -class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) {} +class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable diff --git a/core/src/main/scala/spark/Split.scala b/core/src/main/scala/spark/Split.scala index 62bb5f82c5..831f7672e6 100644 --- a/core/src/main/scala/spark/Split.scala +++ b/core/src/main/scala/spark/Split.scala @@ -3,7 +3,7 @@ package spark /** * A partition of an RDD. */ -@serializable trait Split { +trait Split extends Serializable { /** * Get the split's index within its parent RDD */ diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala index 03274167e1..c34083416f 100644 --- a/core/src/main/scala/spark/Task.scala +++ b/core/src/main/scala/spark/Task.scala @@ -1,11 +1,8 @@ package spark -@serializable -class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) { -} +class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) extends Serializable -@serializable -abstract class Task[T] { +abstract class Task[T] extends Serializable { def run (id: Int): T def preferredLocations: Seq[String] = Nil def generation: Option[Long] = None diff --git a/core/src/main/scala/spark/TaskResult.scala b/core/src/main/scala/spark/TaskResult.scala index db33c9ff44..2b7fd1a4b2 100644 --- a/core/src/main/scala/spark/TaskResult.scala +++ b/core/src/main/scala/spark/TaskResult.scala @@ -5,5 +5,4 @@ import scala.collection.mutable.Map // Task result. Also contains updates to accumulator variables. // TODO: Use of distributed cache to return result is a hack to get around // what seems to be a bug with messages over 60KB in libprocess; fix it -@serializable -private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any]) +private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any]) extends Serializable diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala index 78297be4f3..dadfd94eef 100644 --- a/core/src/main/scala/spark/UnionRDD.scala +++ b/core/src/main/scala/spark/UnionRDD.scala @@ -2,17 +2,15 @@ package spark import scala.collection.mutable.ArrayBuffer -@serializable class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split) -extends Split { +extends Split with Serializable { def iterator() = rdd.iterator(split) def preferredLocations() = rdd.preferredLocations(split) override val index = idx } -@serializable class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]]) -extends RDD[T](sc) { +extends RDD[T](sc) with Serializable { @transient val splits_ : Array[Split] = { val array = new Array[Split](rdds.map(_.splits.size).sum) var pos = 0 diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 220456a210..6960339bf8 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -10,9 +10,8 @@ import scala.math import spark._ -@serializable class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging { +extends Broadcast[T] with Logging with Serializable { def value = value_ diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index f39fb9de69..837129c665 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -7,8 +7,7 @@ import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} import spark._ -@serializable -trait Broadcast[T] { +trait Broadcast[T] extends Serializable { val uuid = UUID.randomUUID def value: T @@ -20,7 +19,7 @@ trait Broadcast[T] { } object Broadcast -extends Logging { +extends Logging with Serializable { // Messages val REGISTER_BROADCAST_TRACKER = 0 val UNREGISTER_BROADCAST_TRACKER = 1 @@ -191,18 +190,15 @@ extends Logging { } } -@serializable -case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { } +case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) extends Serializable -@serializable case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock], val totalBlocks: Int, - val totalBytes: Int) { + val totalBytes: Int) extends Serializable { @transient var hasBlocks = 0 } -@serializable -class SpeedTracker { +class SpeedTracker extends Serializable { // Mapping 'source' to '(totalTime, numBlocks)' private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] () diff --git a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala index 3afe923bae..e33ef78e8a 100644 --- a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala @@ -9,9 +9,8 @@ import scala.math import spark._ -@serializable class ChainedBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging { +extends Broadcast[T] with Logging with Serializable { def value = value_ diff --git a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala index e541c09216..076f18afac 100644 --- a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala @@ -11,9 +11,8 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} import spark._ -@serializable -class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging { +class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean) +extends Broadcast[T] with Logging with Serializable { def value = value_ diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala index 064142590a..03f928953d 100644 --- a/core/src/main/scala/spark/broadcast/SourceInfo.scala +++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala @@ -10,7 +10,6 @@ import spark._ * CHANGED: Keep track of the blockSize for THIS broadcast variable. * Broadcast.BlockSize is expected to be updated across different broadcasts */ -@serializable case class SourceInfo (val hostAddress: String, val listenPort: Int, val totalBlocks: Int = SourceInfo.UnusedParam, diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index 79dcd317ec..945d8cd8a4 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -9,9 +9,8 @@ import scala.math import spark._ -@serializable class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging { +extends Broadcast[T] with Logging with Serializable { def value = value_ diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/examples/src/main/scala/spark/examples/Vector.scala index ea70626e71..dd34dffee5 100644 --- a/examples/src/main/scala/spark/examples/Vector.scala +++ b/examples/src/main/scala/spark/examples/Vector.scala @@ -1,6 +1,6 @@ package spark.examples -@serializable class Vector(val elements: Array[Double]) { +class Vector(val elements: Array[Double]) extends Serializable { def length = elements.length def apply(index: Int) = elements(index) -- cgit v1.2.3