diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-02-16 16:36:12 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-02-19 08:31:58 -0800 |
commit | 35880de42edb30cf705036083710c85a74a351fa (patch) | |
tree | 8edb0259819b287ad353e99c9fd250e73b5d6c01 /streaming/src/main | |
parent | 9d49a6b03fb91d516bf40e50f67e87155c69dba1 (diff) | |
download | spark-35880de42edb30cf705036083710c85a74a351fa.tar.gz spark-35880de42edb30cf705036083710c85a74a351fa.tar.bz2 spark-35880de42edb30cf705036083710c85a74a351fa.zip |
Use RDD type for `transform` operator in Java.
This is an improved implementation of the `transform` operator in Java.
The main difference is that this allows all four possible types of
transform functions
1. JavaRDD -> JavaRDD
2. JavaRDD -> JavaPairRDD
3. JavaPairRDD -> JavaPairRDD
4. JavaPairRDD -> JavaRDD
whereas previously only (1) and (3) were possible.
Conflicts:
streaming/src/test/java/spark/streaming/JavaAPISuite.java
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala | 40 |
1 files changed, 35 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 4e1458ca9e..f7b1704884 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -6,7 +6,7 @@ import java.lang.{Long => JLong} import scala.collection.JavaConversions._ import spark.streaming._ -import spark.api.java.{JavaRDDLike, JavaRDD} +import spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import java.util import spark.RDD @@ -239,11 +239,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ - def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { + def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] def scalaTransform (in: RDD[T]): RDD[U] = - transformFunc.call(new JavaRDD[T](in)).rdd + transformFunc.call(wrapRDD(in)).rdd dstream.transform(scalaTransform(_)) } @@ -251,11 +251,41 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ - def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = { + def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] def scalaTransform (in: RDD[T], time: Time): RDD[U] = - transformFunc.call(new JavaRDD[T](in), time).rdd + transformFunc.call(wrapRDD(in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[T]): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(in), time).rdd dstream.transform(scalaTransform(_, _)) } |