aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-02-12 13:57:57 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-02-12 14:02:32 -0800
commit3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef (patch)
treebee3c620d3df36ecf7ca27a2c9c38afbbc91f2dd /streaming/src/main
parentfd7e414bd0eab4f8d82e225d9981d2eba036e756 (diff)
downloadspark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.tar.gz
spark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.tar.bz2
spark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.zip
STREAMING-50: Support transform workaround in JavaPairDStream
This ports a useful workaround (the `transform` function) to JavaPairDStream. It is necessary to do things like sorting which are not supported yet in the core streaming API.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala34
1 files changed, 32 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ef10c091ca..eb2495e3ac 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -8,11 +8,11 @@ import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import spark.Partitioner
+import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.JavaPairRDD
+import spark.api.java.{JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
@@ -81,6 +81,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] =
+ transformFunc.call(new JavaPairRDD[K, V](in)).rdd
+ dstream.transform(scalaTransform(_))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(new JavaPairRDD[K, V](in), time).rdd
+ dstream.transform(scalaTransform(_, _))
+ }
+
// =======================================================================
// Methods only for PairDStream's
// =======================================================================