aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-05 15:06:20 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commitf144e0413a1e42d193a86fa04af769e2da9dc58b (patch)
tree1678a899ef0052021d995de14e612e718c0c11eb /streaming/src/main
parent0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3 (diff)
downloadspark-f144e0413a1e42d193a86fa04af769e2da9dc58b.tar.gz
spark-f144e0413a1e42d193a86fa04af769e2da9dc58b.tar.bz2
spark-f144e0413a1e42d193a86fa04af769e2da9dc58b.zip
Adding transform and union
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala14
1 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index d0fa06ba7b..56e54c719a 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -86,6 +86,20 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = {
dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
}
+
+ def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ def scalaTransform (in: RDD[T]): RDD[U] = {
+ transformFunc.call(new JavaRDD[T](in)).rdd
+ }
+ dstream.transform(scalaTransform(_))
+ }
+ // TODO: transform with time
+
+ def union(that: JavaDStream[T]): JavaDStream[T] = {
+ dstream.union(that.dstream)
+ }
}
object JavaDStream {