diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-07 11:26:38 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | 056f5efc557a8fcb8871d5abbee082b6398ba78c (patch) | |
tree | ec23ebf4f7138bcbe8dcd18c7433eb8c12f147b9 /streaming/src | |
parent | 6e514a8d3511891a3f7221c594171477a0b5a38f (diff) | |
download | spark-056f5efc557a8fcb8871d5abbee082b6398ba78c.tar.gz spark-056f5efc557a8fcb8871d5abbee082b6398ba78c.tar.bz2 spark-056f5efc557a8fcb8871d5abbee082b6398ba78c.zip |
More pair functions
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala | 43 |
1 files changed, 42 insertions, 1 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index 01dda24fde..cb80a2f3e7 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -6,7 +6,7 @@ import scala.collection.JavaConversions._ import spark.streaming._ import spark.streaming.StreamingContext._ -import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2, FlatMapFunction} import spark.Partitioner class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( @@ -115,6 +115,47 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions) } + def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + dstream.mapValues(f) + } + + def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { + import scala.collection.JavaConverters._ + def fn = (x: V) => f.apply(x).asScala + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + dstream.flatMapValues(fn) + } + + def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream, partitioner) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream) + } + + def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream, partitioner) + } + override val classManifest: ClassManifest[(K, V)] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] } |