aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-14 14:54:47 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 14:56:36 -0800
commit8ad6220bd376b04084604cf49b4537c97a16257d (patch)
treeab2164a37c2c6e4be3ffbf76061b975492b1e162 /streaming
parent38d9a3a8630a38aa0cb9e6a13256816cfa9ab5a6 (diff)
downloadspark-8ad6220bd376b04084604cf49b4537c97a16257d.tar.gz
spark-8ad6220bd376b04084604cf49b4537c97a16257d.tar.bz2
spark-8ad6220bd376b04084604cf49b4537c97a16257d.zip
Bugfix
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala6
1 files changed, 4 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index 6f4336a011..0cccb083c5 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -441,8 +441,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
* of partitions.
*/
- def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
- dstream.cogroup(other)
+ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
/**