From 156e8b47ef24cd1a54ee9f1141a91c20e26ac037 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 9 Jan 2013 12:42:10 -0800 Subject: Split Time to Time (absolute instant of time) and Duration (duration of time). --- .../main/scala/spark/streaming/Checkpoint.scala | 2 +- .../src/main/scala/spark/streaming/DStream.scala | 26 ++++----- .../main/scala/spark/streaming/DStreamGraph.scala | 8 +-- .../src/main/scala/spark/streaming/Duration.scala | 62 ++++++++++++++++++++++ .../src/main/scala/spark/streaming/Interval.scala | 30 ++++------- .../spark/streaming/PairDStreamFunctions.scala | 40 +++++++------- .../src/main/scala/spark/streaming/Scheduler.scala | 5 +- .../scala/spark/streaming/StreamingContext.scala | 12 ++--- .../src/main/scala/spark/streaming/Time.scala | 55 +++++-------------- .../spark/streaming/dstream/CoGroupedDStream.scala | 4 +- .../spark/streaming/dstream/FilteredDStream.scala | 4 +- .../streaming/dstream/FlatMapValuedDStream.scala | 4 +- .../streaming/dstream/FlatMappedDStream.scala | 4 +- .../spark/streaming/dstream/ForEachDStream.scala | 4 +- .../spark/streaming/dstream/GlommedDStream.scala | 4 +- .../spark/streaming/dstream/InputDStream.scala | 4 +- .../streaming/dstream/MapPartitionedDStream.scala | 4 +- .../spark/streaming/dstream/MapValuedDStream.scala | 4 +- .../spark/streaming/dstream/MappedDStream.scala | 4 +- .../streaming/dstream/ReducedWindowedDStream.scala | 16 +++--- .../spark/streaming/dstream/ShuffledDStream.scala | 4 +- .../spark/streaming/dstream/StateDStream.scala | 4 +- .../streaming/dstream/TransformedDStream.scala | 4 +- .../spark/streaming/dstream/UnionDStream.scala | 4 +- .../spark/streaming/dstream/WindowedDStream.scala | 14 ++--- 25 files changed, 174 insertions(+), 152 deletions(-) create mode 100644 streaming/src/main/scala/spark/streaming/Duration.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 11a7232d7b..a9c6e65d62 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -17,7 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir - val checkpointInterval = ssc.checkpointInterval + val checkpointInterval: Duration = ssc.checkpointInterval def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index beba9cfd4f..7611598fde 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -2,7 +2,7 @@ package spark.streaming import spark.streaming.dstream._ import StreamingContext._ -import Time._ +//import Time._ import spark.{RDD, Logging} import spark.storage.StorageLevel @@ -47,7 +47,7 @@ abstract class DStream[T: ClassManifest] ( // ======================================================================= /** Time interval after which the DStream generates a RDD */ - def slideTime: Time + def slideTime: Duration /** List of parent DStreams on which this DStream depends on */ def dependencies: List[DStream[_]] @@ -67,14 +67,14 @@ abstract class DStream[T: ClassManifest] ( protected[streaming] var zeroTime: Time = null // Duration for which the DStream will remember each RDD created - protected[streaming] var rememberDuration: Time = null + protected[streaming] var rememberDuration: Duration = null // Storage level of the RDDs in the stream protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE // Checkpoint details protected[streaming] val mustCheckpoint = false - protected[streaming] var checkpointInterval: Time = null + protected[streaming] var checkpointInterval: Duration = null protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]()) // Reference to whole DStream graph @@ -108,7 +108,7 @@ abstract class DStream[T: ClassManifest] ( * Enable periodic checkpointing of RDDs of this DStream * @param interval Time interval after which generated RDD will be checkpointed */ - def checkpoint(interval: Time): DStream[T] = { + def checkpoint(interval: Duration): DStream[T] = { if (isInitialized) { throw new UnsupportedOperationException( "Cannot change checkpoint interval of an DStream after streaming context has started") @@ -224,7 +224,7 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.setGraph(graph)) } - protected[streaming] def remember(duration: Time) { + protected[streaming] def remember(duration: Duration) { if (duration != null && duration > rememberDuration) { rememberDuration = duration logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) @@ -531,7 +531,7 @@ abstract class DStream[T: ClassManifest] ( * @param windowTime width of the window; must be a multiple of this DStream's interval. * @return */ - def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime) + def window(windowTime: Duration): DStream[T] = window(windowTime, this.slideTime) /** * Return a new DStream which is computed based on windowed batches of this DStream. @@ -541,7 +541,7 @@ abstract class DStream[T: ClassManifest] ( * the new DStream will generate RDDs); must be a multiple of this * DStream's interval */ - def window(windowTime: Time, slideTime: Time): DStream[T] = { + def window(windowTime: Duration, slideTime: Duration): DStream[T] = { new WindowedDStream(this, windowTime, slideTime) } @@ -550,22 +550,22 @@ abstract class DStream[T: ClassManifest] ( * This is equivalent to window(batchTime, batchTime). * @param batchTime tumbling window duration; must be a multiple of this DStream's interval */ - def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime) + def tumble(batchTime: Duration): DStream[T] = window(batchTime, batchTime) /** * Returns a new DStream in which each RDD has a single element generated by reducing all * elements in a window over this DStream. windowTime and slideTime are as defined in the * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc) */ - def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = { + def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Duration, slideTime: Duration): DStream[T] = { this.window(windowTime, slideTime).reduce(reduceFunc) } def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, - windowTime: Time, - slideTime: Time + windowTime: Duration, + slideTime: Duration ): DStream[T] = { this.map(x => (1, x)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1) @@ -577,7 +577,7 @@ abstract class DStream[T: ClassManifest] ( * of elements in a window over this DStream. windowTime and slideTime are as defined in the * window() operation. This is equivalent to window(windowTime, slideTime).count() */ - def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = { + def countByWindow(windowTime: Duration, slideTime: Duration): DStream[Int] = { this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime) } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index c72429370e..bc4a40d7bc 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -12,8 +12,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val outputStreams = new ArrayBuffer[DStream[_]]() private[streaming] var zeroTime: Time = null - private[streaming] var batchDuration: Time = null - private[streaming] var rememberDuration: Time = null + private[streaming] var batchDuration: Duration = null + private[streaming] var rememberDuration: Duration = null private[streaming] var checkpointInProgress = false private[streaming] def start(time: Time) { @@ -41,7 +41,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } } - private[streaming] def setBatchDuration(duration: Time) { + private[streaming] def setBatchDuration(duration: Duration) { this.synchronized { if (batchDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + @@ -51,7 +51,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { batchDuration = duration } - private[streaming] def remember(duration: Time) { + private[streaming] def remember(duration: Duration) { this.synchronized { if (rememberDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala new file mode 100644 index 0000000000..d2728d9dca --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/Duration.scala @@ -0,0 +1,62 @@ +package spark.streaming + +class Duration (private val millis: Long) { + + def < (that: Duration): Boolean = (this.millis < that.millis) + + def <= (that: Duration): Boolean = (this.millis <= that.millis) + + def > (that: Duration): Boolean = (this.millis > that.millis) + + def >= (that: Duration): Boolean = (this.millis >= that.millis) + + def + (that: Duration): Duration = new Duration(millis + that.millis) + + def - (that: Duration): Duration = new Duration(millis - that.millis) + + def * (times: Int): Duration = new Duration(millis * times) + + def / (that: Duration): Long = millis / that.millis + + def isMultipleOf(that: Duration): Boolean = + (this.millis % that.millis == 0) + + def min(that: Duration): Duration = if (this < that) this else that + + def max(that: Duration): Duration = if (this > that) this else that + + def isZero: Boolean = (this.millis == 0) + + override def toString: String = (millis.toString + " ms") + + def toFormattedString: String = millis.toString + + def milliseconds: Long = millis +} + + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of milliseconds. + */ +object Milliseconds { + def apply(milliseconds: Long) = new Duration(milliseconds) +} + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of seconds. + */ +object Seconds { + def apply(seconds: Long) = new Duration(seconds * 1000) +} + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of minutes. + */ +object Minutes { + def apply(minutes: Long) = new Duration(minutes * 60000) +} + + diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala index fa0b7ce19d..dc21dfb722 100644 --- a/streaming/src/main/scala/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -1,16 +1,16 @@ package spark.streaming private[streaming] -case class Interval(beginTime: Time, endTime: Time) { - def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs)) +class Interval(val beginTime: Time, val endTime: Time) { + def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs)) - def duration(): Time = endTime - beginTime + def duration(): Duration = endTime - beginTime - def + (time: Time): Interval = { + def + (time: Duration): Interval = { new Interval(beginTime + time, endTime + time) } - def - (time: Time): Interval = { + def - (time: Duration): Interval = { new Interval(beginTime - time, endTime - time) } @@ -27,24 +27,14 @@ case class Interval(beginTime: Time, endTime: Time) { def >= (that: Interval) = !(this < that) - def next(): Interval = { - this + (endTime - beginTime) - } - - def isZero = (beginTime.isZero && endTime.isZero) - - def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString - - override def toString = "[" + beginTime + ", " + endTime + "]" + override def toString = "[" + beginTime + ", " + endTime + "]" } object Interval { - def zero() = new Interval (Time.zero, Time.zero) - - def currentInterval(intervalDuration: Time): Interval = { - val time = Time(System.currentTimeMillis) - val intervalBegin = time.floor(intervalDuration) - Interval(intervalBegin, intervalBegin + intervalDuration) + def currentInterval(duration: Duration): Interval = { + val time = new Time(System.currentTimeMillis) + val intervalBegin = time.floor(duration) + new Interval(intervalBegin, intervalBegin + duration) } } diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index b0a208e67f..dd64064138 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -69,21 +69,21 @@ extends Serializable { self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = { + def groupByKeyAndWindow(windowTime: Duration, slideTime: Duration): DStream[(K, Seq[V])] = { groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner()) } def groupByKeyAndWindow( - windowTime: Time, - slideTime: Time, + windowTime: Duration, + slideTime: Duration, numPartitions: Int ): DStream[(K, Seq[V])] = { groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions)) } def groupByKeyAndWindow( - windowTime: Time, - slideTime: Time, + windowTime: Duration, + slideTime: Duration, partitioner: Partitioner ): DStream[(K, Seq[V])] = { self.window(windowTime, slideTime).groupByKey(partitioner) @@ -91,23 +91,23 @@ extends Serializable { def reduceByKeyAndWindow( reduceFunc: (V, V) => V, - windowTime: Time + windowTime: Duration ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowTime, self.slideTime, defaultPartitioner()) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time + windowTime: Duration, + slideTime: Duration ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner()) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time, + windowTime: Duration, + slideTime: Duration, numPartitions: Int ): DStream[(K, V)] = { reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions)) @@ -115,8 +115,8 @@ extends Serializable { def reduceByKeyAndWindow( reduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time, + windowTime: Duration, + slideTime: Duration, partitioner: Partitioner ): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) @@ -134,8 +134,8 @@ extends Serializable { def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time + windowTime: Duration, + slideTime: Duration ): DStream[(K, V)] = { reduceByKeyAndWindow( @@ -145,8 +145,8 @@ extends Serializable { def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time, + windowTime: Duration, + slideTime: Duration, numPartitions: Int ): DStream[(K, V)] = { @@ -157,8 +157,8 @@ extends Serializable { def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time, + windowTime: Duration, + slideTime: Duration, partitioner: Partitioner ): DStream[(K, V)] = { @@ -169,8 +169,8 @@ extends Serializable { } def countByKeyAndWindow( - windowTime: Time, - slideTime: Time, + windowTime: Duration, + slideTime: Duration, numPartitions: Int = self.ssc.sc.defaultParallelism ): DStream[(K, Long)] = { diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index eb40affe6d..10845e3a5e 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -22,7 +22,8 @@ class Scheduler(ssc: StreamingContext) extends Logging { val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] - val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_)) + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, + longTime => generateRDDs(new Time(longTime))) def start() { // If context was started from checkpoint, then restart timer such that @@ -41,7 +42,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { timer.restart(graph.zeroTime.milliseconds) logInfo("Scheduler's timer restarted") } else { - val firstTime = Time(timer.start()) + val firstTime = new Time(timer.start()) graph.start(firstTime - ssc.graph.batchDuration) logInfo("Scheduler's timer started") } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 215246ba2e..ee8314df3f 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -26,7 +26,7 @@ import java.util.UUID class StreamingContext private ( sc_ : SparkContext, cp_ : Checkpoint, - batchDur_ : Time + batchDur_ : Duration ) extends Logging { /** @@ -34,7 +34,7 @@ class StreamingContext private ( * @param sparkContext Existing SparkContext * @param batchDuration The time interval at which streaming data will be divided into batches */ - def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration) + def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) /** * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. @@ -42,7 +42,7 @@ class StreamingContext private ( * @param frameworkName A name for your job, to display on the cluster web UI * @param batchDuration The time interval at which streaming data will be divided into batches */ - def this(master: String, frameworkName: String, batchDuration: Time) = + def this(master: String, frameworkName: String, batchDuration: Duration) = this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) /** @@ -96,7 +96,7 @@ class StreamingContext private ( } } - protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null + protected[streaming] var checkpointInterval: Duration = if (isCheckpointPresent) cp_.checkpointInterval else null protected[streaming] var receiverJobThread: Thread = null protected[streaming] var scheduler: Scheduler = null @@ -107,7 +107,7 @@ class StreamingContext private ( * if the developer wishes to query old data outside the DStream computation). * @param duration Minimum duration that each DStream should remember its RDDs */ - def remember(duration: Time) { + def remember(duration: Duration) { graph.remember(duration) } @@ -117,7 +117,7 @@ class StreamingContext private ( * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Time = null) { + def checkpoint(directory: String, interval: Duration = null) { if (directory != null) { sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) checkpointDir = directory diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 3c6fd5d967..069df82e52 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -7,7 +7,7 @@ package spark.streaming * @param millis Time in UTC. */ -case class Time(private val millis: Long) { +class Time(private val millis: Long) { def < (that: Time): Boolean = (this.millis < that.millis) @@ -17,63 +17,32 @@ case class Time(private val millis: Long) { def >= (that: Time): Boolean = (this.millis >= that.millis) - def + (that: Time): Time = Time(millis + that.millis) - - def - (that: Time): Time = Time(millis - that.millis) - - def * (times: Int): Time = Time(millis * times) + def + (that: Duration): Time = new Time(millis + that.milliseconds) + + def - (that: Time): Duration = new Duration(millis - that.millis) - def / (that: Time): Long = millis / that.millis + def - (that: Duration): Time = new Time(millis - that.milliseconds) - def floor(that: Time): Time = { - val t = that.millis + def floor(that: Duration): Time = { + val t = that.milliseconds val m = math.floor(this.millis / t).toLong - Time(m * t) + new Time(m * t) } - def isMultipleOf(that: Time): Boolean = - (this.millis % that.millis == 0) + def isMultipleOf(that: Duration): Boolean = + (this.millis % that.milliseconds == 0) def min(that: Time): Time = if (this < that) this else that def max(that: Time): Time = if (this > that) this else that - def isZero: Boolean = (this.millis == 0) - override def toString: String = (millis.toString + " ms") - def toFormattedString: String = millis.toString - def milliseconds: Long = millis } -private[streaming] object Time { - val zero = Time(0) - +/*private[streaming] object Time { implicit def toTime(long: Long) = Time(long) } - -/** - * Helper object that creates instance of [[spark.streaming.Time]] representing - * a given number of milliseconds. - */ -object Milliseconds { - def apply(milliseconds: Long) = Time(milliseconds) -} - -/** - * Helper object that creates instance of [[spark.streaming.Time]] representing - * a given number of seconds. - */ -object Seconds { - def apply(seconds: Long) = Time(seconds * 1000) -} - -/** - * Helper object that creates instance of [[spark.streaming.Time]] representing - * a given number of minutes. - */ -object Minutes { - def apply(minutes: Long) = Time(minutes * 60000) -} +*/ diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala index bc23d423d3..ca178fd384 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala @@ -2,7 +2,7 @@ package spark.streaming.dstream import spark.{RDD, Partitioner} import spark.rdd.CoGroupedRDD -import spark.streaming.{Time, DStream} +import spark.streaming.{Time, DStream, Duration} private[streaming] class CoGroupedDStream[K : ClassManifest]( @@ -24,7 +24,7 @@ class CoGroupedDStream[K : ClassManifest]( override def dependencies = parents.toList - override def slideTime = parents.head.slideTime + override def slideTime: Duration = parents.head.slideTime override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { val part = partitioner diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala index 1cbb4d536e..76b9e58029 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -11,7 +11,7 @@ class FilteredDStream[T: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[T]] = { parent.getOrCompute(validTime).map(_.filter(filterFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala index 11ed8cf317..28e9a456ac 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD import spark.SparkContext._ @@ -12,7 +12,7 @@ class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest] override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[(K, U)]] = { parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala index a13b4c9ff9..ef305b66f1 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -11,7 +11,7 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala index 41c629a225..f8af0a38a7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala @@ -1,7 +1,7 @@ package spark.streaming.dstream import spark.RDD -import spark.streaming.{DStream, Job, Time} +import spark.streaming.{Duration, DStream, Job, Time} private[streaming] class ForEachDStream[T: ClassManifest] ( @@ -11,7 +11,7 @@ class ForEachDStream[T: ClassManifest] ( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[Unit]] = None diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala index 92ea503cae..19cccea735 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -9,7 +9,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T]) override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[Array[T]]] = { parent.getOrCompute(validTime).map(_.glom()) diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala index 4959c66b06..50f0f45796 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -1,13 +1,13 @@ package spark.streaming.dstream -import spark.streaming.{StreamingContext, DStream} +import spark.streaming.{Duration, StreamingContext, DStream} abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { override def dependencies = List() - override def slideTime = { + override def slideTime: Duration = { if (ssc == null) throw new Exception("ssc is null") if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") ssc.graph.batchDuration diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala index daf78c6893..e9ca668aa6 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -12,7 +12,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala index 689caeef0e..ebc7d0698b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD import spark.SparkContext._ @@ -12,7 +12,7 @@ class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[(K, U)]] = { parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala index 786b9966f2..3af8e7ab88 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -11,7 +11,7 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] ( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala index d289ed2a3f..a685a778ce 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -9,15 +9,15 @@ import spark.SparkContext._ import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer -import spark.streaming.{Interval, Time, DStream} +import spark.streaming.{Duration, Interval, Time, DStream} private[streaming] class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( parent: DStream[(K, V)], reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, - _windowTime: Time, - _slideTime: Time, + _windowTime: Duration, + _slideTime: Duration, partitioner: Partitioner ) extends DStream[(K,V)](parent.ssc) { @@ -39,15 +39,15 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( super.persist(StorageLevel.MEMORY_ONLY_SER) reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) - def windowTime: Time = _windowTime + def windowTime: Duration = _windowTime override def dependencies = List(reducedStream) - override def slideTime: Time = _slideTime + override def slideTime: Duration = _slideTime override val mustCheckpoint = true - override def parentRememberDuration: Time = rememberDuration + windowTime + override def parentRememberDuration: Duration = rememberDuration + windowTime override def persist(storageLevel: StorageLevel): DStream[(K,V)] = { super.persist(storageLevel) @@ -55,7 +55,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( this } - override def checkpoint(interval: Time): DStream[(K, V)] = { + override def checkpoint(interval: Duration): DStream[(K, V)] = { super.checkpoint(interval) //reducedStream.checkpoint(interval) this @@ -66,7 +66,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( val invReduceF = invReduceFunc val currentTime = validTime - val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime) + val currentWindow = new Interval(currentTime - windowTime + parent.slideTime, currentTime) val previousWindow = currentWindow - slideTime logDebug("Window time = " + windowTime) diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala index 6854bbe665..7612804b96 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala @@ -2,7 +2,7 @@ package spark.streaming.dstream import spark.{RDD, Partitioner} import spark.SparkContext._ -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} private[streaming] class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( @@ -15,7 +15,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[(K,C)]] = { parent.getOrCompute(validTime) match { diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index 175b3060c1..ce4f486825 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -4,7 +4,7 @@ import spark.RDD import spark.Partitioner import spark.SparkContext._ import spark.storage.StorageLevel -import spark.streaming.{Time, DStream} +import spark.streaming.{Duration, Time, DStream} private[streaming] class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( @@ -18,7 +18,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife override def dependencies = List(parent) - override def slideTime = parent.slideTime + override def slideTime: Duration = parent.slideTime override val mustCheckpoint = true diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala index 0337579514..5a2c5bc0f0 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala @@ -1,7 +1,7 @@ package spark.streaming.dstream import spark.RDD -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} private[streaming] class TransformedDStream[T: ClassManifest, U: ClassManifest] ( @@ -11,7 +11,7 @@ class TransformedDStream[T: ClassManifest, U: ClassManifest] ( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideTime: Duration = parent.slideTime override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(transformFunc(_, validTime)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala index 3bf4c2ecea..224a19842b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD import collection.mutable.ArrayBuffer import spark.rdd.UnionRDD @@ -23,7 +23,7 @@ class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) override def dependencies = parents.toList - override def slideTime: Time = parents.head.slideTime + override def slideTime: Duration = parents.head.slideTime override def compute(validTime: Time): Option[RDD[T]] = { val rdds = new ArrayBuffer[RDD[T]]() diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala index 7718794cbf..45689b25ce 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala @@ -3,13 +3,13 @@ package spark.streaming.dstream import spark.RDD import spark.rdd.UnionRDD import spark.storage.StorageLevel -import spark.streaming.{Interval, Time, DStream} +import spark.streaming.{Duration, Interval, Time, DStream} private[streaming] class WindowedDStream[T: ClassManifest]( parent: DStream[T], - _windowTime: Time, - _slideTime: Time) + _windowTime: Duration, + _slideTime: Duration) extends DStream[T](parent.ssc) { if (!_windowTime.isMultipleOf(parent.slideTime)) @@ -22,16 +22,16 @@ class WindowedDStream[T: ClassManifest]( parent.persist(StorageLevel.MEMORY_ONLY_SER) - def windowTime: Time = _windowTime + def windowTime: Duration = _windowTime override def dependencies = List(parent) - override def slideTime: Time = _slideTime + override def slideTime: Duration = _slideTime - override def parentRememberDuration: Time = rememberDuration + windowTime + override def parentRememberDuration: Duration = rememberDuration + windowTime override def compute(validTime: Time): Option[RDD[T]] = { - val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime) + val currentWindow = new Interval(validTime - windowTime + parent.slideTime, validTime) Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) } } -- cgit v1.2.3