aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
blob: ee69ea5177921e8b172427b6e6f579f18f70a7a3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package spark.streaming.dstream

import spark.RDD
import spark.streaming.{Duration, DStream, Job, Time}

private[streaming]
class ForEachDStream[T: ClassManifest] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}