aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala')
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala20
1 files changed, 20 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
new file mode 100644
index 0000000000..a13b4c9ff9
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ flatMapFunc: T => Traversable[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
+ }
+}
+