diff options
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 4 |
2 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index cc3cca2571..36b9880cd1 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -361,7 +361,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), + Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]), partitioner) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) prfs.mapValues { @@ -380,9 +380,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(_, _)]], - other1.asInstanceOf[RDD[(_, _)]], - other2.asInstanceOf[RDD[(_, _)]]), + Seq(self.asInstanceOf[RDD[(K, _)]], + other1.asInstanceOf[RDD[(K, _)]], + other2.asInstanceOf[RDD[(K, _)]]), partitioner) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) prfs.mapValues { diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 0a1e2cbee0..868ee5a39f 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -40,8 +40,8 @@ private[spark] class CoGroupAggregator { (b1, b2) => b1 ++ b2 }) with Serializable -class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) - extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging { +class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner) + extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { private val aggr = new CoGroupAggregator |