diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-02-12 13:57:57 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-02-12 14:02:32 -0800 |
commit | 3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef (patch) | |
tree | bee3c620d3df36ecf7ca27a2c9c38afbbc91f2dd /streaming/src/main | |
parent | fd7e414bd0eab4f8d82e225d9981d2eba036e756 (diff) | |
download | spark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.tar.gz spark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.tar.bz2 spark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.zip |
STREAMING-50: Support transform workaround in JavaPairDStream
This ports a useful workaround (the `transform` function) to
JavaPairDStream. It is necessary to do things like sorting which
are not supported yet in the core streaming API.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index ef10c091ca..eb2495e3ac 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -8,11 +8,11 @@ import scala.collection.JavaConversions._ import spark.streaming._ import spark.streaming.StreamingContext._ import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import spark.Partitioner +import spark.{RDD, Partitioner} import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration -import spark.api.java.JavaPairRDD +import spark.api.java.{JavaRDD, JavaPairRDD} import spark.storage.StorageLevel import com.google.common.base.Optional @@ -81,6 +81,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = dstream.union(that.dstream) + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] = + transformFunc.call(new JavaPairRDD[K, V](in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] = + transformFunc.call(new JavaPairRDD[K, V](in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + // ======================================================================= // Methods only for PairDStream's // ======================================================================= |