diff options
Diffstat (limited to 'core/src/main/scala/spark/rdd/CoGroupedRDD.scala')
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 019b12d2d5..c2d95dc060 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -60,12 +60,16 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) * @param rdds parent RDDs. * @param part partitioner used to partition the shuffle output. */ -class CoGroupedRDD[K]( - @transient var rdds: Seq[RDD[(K, _)]], - part: Partitioner, - val serializerClass: String = null) +class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { + private var serializerClass: String = null + + def setSerializer(cls: String): CoGroupedRDD[K] = { + serializerClass = cls + this + } + override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[(K, _)] => if (rdd.partitioner == Some(part)) { |