diff options
author | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-16 13:10:31 -0600 |
---|---|---|
committer | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-16 13:10:31 -0600 |
commit | ae2234687d9040b42619c374eadfd40c896d386d (patch) | |
tree | 13ab3ae363423b1576be4340b73dfbf634b8f039 /core/src | |
parent | beb7ab870858541d736033cc3c6fad4dad657aa3 (diff) | |
download | spark-ae2234687d9040b42619c374eadfd40c896d386d.tar.gz spark-ae2234687d9040b42619c374eadfd40c896d386d.tar.bz2 spark-ae2234687d9040b42619c374eadfd40c896d386d.zip |
Make CoGroupedRDDs explicitly have the same key type.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 4 | ||||
-rw-r--r-- | core/src/test/scala/spark/CheckpointSuite.scala | 2 |
3 files changed, 7 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index cc3cca2571..36b9880cd1 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -361,7 +361,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), + Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]), partitioner) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) prfs.mapValues { @@ -380,9 +380,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(_, _)]], - other1.asInstanceOf[RDD[(_, _)]], - other2.asInstanceOf[RDD[(_, _)]]), + Seq(self.asInstanceOf[RDD[(K, _)]], + other1.asInstanceOf[RDD[(K, _)]], + other2.asInstanceOf[RDD[(K, _)]]), partitioner) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) prfs.mapValues { diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 0a1e2cbee0..868ee5a39f 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -40,8 +40,8 @@ private[spark] class CoGroupAggregator { (b1, b2) => b1 ++ b2 }) with Serializable -class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) - extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging { +class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner) + extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { private val aggr = new CoGroupAggregator diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 0d08fd2396..51ff966ae4 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -347,7 +347,7 @@ object CheckpointSuite { def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = { //println("First = " + first + ", second = " + second) new CoGroupedRDD[K]( - Seq(first.asInstanceOf[RDD[(_, _)]], second.asInstanceOf[RDD[(_, _)]]), + Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]), part ).asInstanceOf[RDD[(K, Seq[Seq[V]])]] } |