aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-07 11:26:38 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit056f5efc557a8fcb8871d5abbee082b6398ba78c (patch)
treeec23ebf4f7138bcbe8dcd18c7433eb8c12f147b9 /streaming
parent6e514a8d3511891a3f7221c594171477a0b5a38f (diff)
downloadspark-056f5efc557a8fcb8871d5abbee082b6398ba78c.tar.gz
spark-056f5efc557a8fcb8871d5abbee082b6398ba78c.tar.bz2
spark-056f5efc557a8fcb8871d5abbee082b6398ba78c.zip
More pair functions
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala43
1 files changed, 42 insertions, 1 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index 01dda24fde..cb80a2f3e7 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -6,7 +6,7 @@ import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
-import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import spark.api.java.function.{Function => JFunction, Function2 => JFunction2, FlatMapFunction}
import spark.Partitioner
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
@@ -115,6 +115,47 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions)
}
+ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ dstream.mapValues(f)
+ }
+
+ def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: V) => f.apply(x).asScala
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ dstream.flatMapValues(fn)
+ }
+
+ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ }
+
+ def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
+ : JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream, partitioner)
+ .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ }
+
+ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.join(other.dstream)
+ }
+
+ def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
+ : JavaPairDStream[K, (V, W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.join(other.dstream, partitioner)
+ }
+
override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
}