aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/rdd/CoGroupedRDD.scala')
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 019b12d2d5..c2d95dc060 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -60,12 +60,16 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
*/
-class CoGroupedRDD[K](
- @transient var rdds: Seq[RDD[(K, _)]],
- part: Partitioner,
- val serializerClass: String = null)
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+ private var serializerClass: String = null
+
+ def setSerializer(cls: String): CoGroupedRDD[K] = {
+ serializerClass = cls
+ this
+ }
+
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[(K, _)] =>
if (rdd.partitioner == Some(part)) {