aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-16 13:10:31 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-16 13:10:31 -0600
commitae2234687d9040b42619c374eadfd40c896d386d (patch)
tree13ab3ae363423b1576be4340b73dfbf634b8f039 /core/src/main/scala
parentbeb7ab870858541d736033cc3c6fad4dad657aa3 (diff)
downloadspark-ae2234687d9040b42619c374eadfd40c896d386d.tar.gz
spark-ae2234687d9040b42619c374eadfd40c896d386d.tar.bz2
spark-ae2234687d9040b42619c374eadfd40c896d386d.zip
Make CoGroupedRDDs explicitly have the same key type.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala8
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala4
2 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index cc3cca2571..36b9880cd1 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -361,7 +361,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
+ Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
prfs.mapValues {
@@ -380,9 +380,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(_, _)]],
- other1.asInstanceOf[RDD[(_, _)]],
- other2.asInstanceOf[RDD[(_, _)]]),
+ Seq(self.asInstanceOf[RDD[(K, _)]],
+ other1.asInstanceOf[RDD[(K, _)]],
+ other2.asInstanceOf[RDD[(K, _)]]),
partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
prfs.mapValues {
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 0a1e2cbee0..868ee5a39f 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -40,8 +40,8 @@ private[spark] class CoGroupAggregator
{ (b1, b2) => b1 ++ b2 })
with Serializable
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
- extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging {
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
+ extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
private val aggr = new CoGroupAggregator