aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-16 13:10:31 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-16 13:10:31 -0600
commitae2234687d9040b42619c374eadfd40c896d386d (patch)
tree13ab3ae363423b1576be4340b73dfbf634b8f039 /streaming
parentbeb7ab870858541d736033cc3c6fad4dad657aa3 (diff)
downloadspark-ae2234687d9040b42619c374eadfd40c896d386d.tar.gz
spark-ae2234687d9040b42619c374eadfd40c896d386d.tar.bz2
spark-ae2234687d9040b42619c374eadfd40c896d386d.zip
Make CoGroupedRDDs explicitly have the same key type.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala2
3 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index fbcf061126..5db3844f1d 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -457,7 +457,7 @@ extends Serializable {
): DStream[(K, (Seq[V], Seq[W]))] = {
val cgd = new CoGroupedDStream[K](
- Seq(self.asInstanceOf[DStream[(_, _)]], other.asInstanceOf[DStream[(_, _)]]),
+ Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
partitioner
)
val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index ddb1bf6b28..4ef4bb7de1 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -6,7 +6,7 @@ import spark.streaming.{Time, DStream, Duration}
private[streaming]
class CoGroupedDStream[K : ClassManifest](
- parents: Seq[DStream[(_, _)]],
+ parents: Seq[DStream[(K, _)]],
partitioner: Partitioner
) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index 733d5c4a25..263655039c 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -101,7 +101,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
- val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
+ val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
//val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
val numOldValues = oldRDDs.size