aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/CoGroupedRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/CoGroupedRDD.scala')
-rw-r--r--core/src/main/scala/spark/CoGroupedRDD.scala9
1 files changed, 3 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala
index a159ca1534..4a8fa6d3fc 100644
--- a/core/src/main/scala/spark/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/CoGroupedRDD.scala
@@ -6,24 +6,21 @@ import java.io.ObjectInputStream
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-@serializable
-sealed trait CoGroupSplitDep
+sealed trait CoGroupSplitDep extends Serializable
case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-@serializable
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep])
-extends Split {
+extends Split with Serializable {
override val index = idx
override def hashCode(): Int = idx
}
-@serializable
class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] (
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
{ (b1, b2) => b1 ++ b2 }
-)
+) with Serializable
class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {