diff options
Diffstat (limited to 'core/src/main/scala/spark/CoGroupedRDD.scala')
-rw-r--r-- | core/src/main/scala/spark/CoGroupedRDD.scala | 9 |
1 files changed, 3 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index a159ca1534..4a8fa6d3fc 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -6,24 +6,21 @@ import java.io.ObjectInputStream import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -@serializable -sealed trait CoGroupSplitDep +sealed trait CoGroupSplitDep extends Serializable case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep -@serializable class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) -extends Split { +extends Split with Serializable { override val index = idx override def hashCode(): Int = idx } -@serializable class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] ( { x => ArrayBuffer(x) }, { (b, x) => b += x }, { (b1, b2) => b1 ++ b2 } -) +) with Serializable class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { |