diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-05 15:06:20 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | f144e0413a1e42d193a86fa04af769e2da9dc58b (patch) | |
tree | 1678a899ef0052021d995de14e612e718c0c11eb /streaming/src/main | |
parent | 0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3 (diff) | |
download | spark-f144e0413a1e42d193a86fa04af769e2da9dc58b.tar.gz spark-f144e0413a1e42d193a86fa04af769e2da9dc58b.tar.bz2 spark-f144e0413a1e42d193a86fa04af769e2da9dc58b.zip |
Adding transform and union
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala | 14 |
1 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index d0fa06ba7b..56e54c719a 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -86,6 +86,20 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) } + + def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T]): RDD[U] = { + transformFunc.call(new JavaRDD[T](in)).rdd + } + dstream.transform(scalaTransform(_)) + } + // TODO: transform with time + + def union(that: JavaDStream[T]): JavaDStream[T] = { + dstream.union(that.dstream) + } } object JavaDStream { |