diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 14:54:47 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 14:56:36 -0800 |
commit | 8ad6220bd376b04084604cf49b4537c97a16257d (patch) | |
tree | ab2164a37c2c6e4be3ffbf76061b975492b1e162 /streaming/src | |
parent | 38d9a3a8630a38aa0cb9e6a13256816cfa9ab5a6 (diff) | |
download | spark-8ad6220bd376b04084604cf49b4537c97a16257d.tar.gz spark-8ad6220bd376b04084604cf49b4537c97a16257d.tar.bz2 spark-8ad6220bd376b04084604cf49b4537c97a16257d.zip |
Bugfix
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index 6f4336a011..0cccb083c5 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -441,8 +441,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number * of partitions. */ - def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { - dstream.cogroup(other) + def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } /** |