aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-09 12:42:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-09 12:42:10 -0800
commit156e8b47ef24cd1a54ee9f1141a91c20e26ac037 (patch)
treea3a944fe3ac6b03a65c9903e6eca76bcc6d0dae6 /streaming/src
parent6c502e37934c737494c7288e63f0ed82b21604b5 (diff)
downloadspark-156e8b47ef24cd1a54ee9f1141a91c20e26ac037.tar.gz
spark-156e8b47ef24cd1a54ee9f1141a91c20e26ac037.tar.bz2
spark-156e8b47ef24cd1a54ee9f1141a91c20e26ac037.zip
Split Time to Time (absolute instant of time) and Duration (duration of time).
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala26
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Duration.scala62
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala30
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala55
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala16
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala14
25 files changed, 174 insertions, 152 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 11a7232d7b..a9c6e65d62 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,7 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
- val checkpointInterval = ssc.checkpointInterval
+ val checkpointInterval: Duration = ssc.checkpointInterval
def validate() {
assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index beba9cfd4f..7611598fde 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -2,7 +2,7 @@ package spark.streaming
import spark.streaming.dstream._
import StreamingContext._
-import Time._
+//import Time._
import spark.{RDD, Logging}
import spark.storage.StorageLevel
@@ -47,7 +47,7 @@ abstract class DStream[T: ClassManifest] (
// =======================================================================
/** Time interval after which the DStream generates a RDD */
- def slideTime: Time
+ def slideTime: Duration
/** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
@@ -67,14 +67,14 @@ abstract class DStream[T: ClassManifest] (
protected[streaming] var zeroTime: Time = null
// Duration for which the DStream will remember each RDD created
- protected[streaming] var rememberDuration: Time = null
+ protected[streaming] var rememberDuration: Duration = null
// Storage level of the RDDs in the stream
protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
// Checkpoint details
protected[streaming] val mustCheckpoint = false
- protected[streaming] var checkpointInterval: Time = null
+ protected[streaming] var checkpointInterval: Duration = null
protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
// Reference to whole DStream graph
@@ -108,7 +108,7 @@ abstract class DStream[T: ClassManifest] (
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
- def checkpoint(interval: Time): DStream[T] = {
+ def checkpoint(interval: Duration): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
"Cannot change checkpoint interval of an DStream after streaming context has started")
@@ -224,7 +224,7 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.setGraph(graph))
}
- protected[streaming] def remember(duration: Time) {
+ protected[streaming] def remember(duration: Duration) {
if (duration != null && duration > rememberDuration) {
rememberDuration = duration
logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
@@ -531,7 +531,7 @@ abstract class DStream[T: ClassManifest] (
* @param windowTime width of the window; must be a multiple of this DStream's interval.
* @return
*/
- def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime)
+ def window(windowTime: Duration): DStream[T] = window(windowTime, this.slideTime)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
@@ -541,7 +541,7 @@ abstract class DStream[T: ClassManifest] (
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
- def window(windowTime: Time, slideTime: Time): DStream[T] = {
+ def window(windowTime: Duration, slideTime: Duration): DStream[T] = {
new WindowedDStream(this, windowTime, slideTime)
}
@@ -550,22 +550,22 @@ abstract class DStream[T: ClassManifest] (
* This is equivalent to window(batchTime, batchTime).
* @param batchTime tumbling window duration; must be a multiple of this DStream's interval
*/
- def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime)
+ def tumble(batchTime: Duration): DStream[T] = window(batchTime, batchTime)
/**
* Returns a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowTime and slideTime are as defined in the
* window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
*/
- def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = {
+ def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Duration, slideTime: Duration): DStream[T] = {
this.window(windowTime, slideTime).reduce(reduceFunc)
}
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
- windowTime: Time,
- slideTime: Time
+ windowTime: Duration,
+ slideTime: Duration
): DStream[T] = {
this.map(x => (1, x))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1)
@@ -577,7 +577,7 @@ abstract class DStream[T: ClassManifest] (
* of elements in a window over this DStream. windowTime and slideTime are as defined in the
* window() operation. This is equivalent to window(windowTime, slideTime).count()
*/
- def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = {
+ def countByWindow(windowTime: Duration, slideTime: Duration): DStream[Int] = {
this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index c72429370e..bc4a40d7bc 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -12,8 +12,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val outputStreams = new ArrayBuffer[DStream[_]]()
private[streaming] var zeroTime: Time = null
- private[streaming] var batchDuration: Time = null
- private[streaming] var rememberDuration: Time = null
+ private[streaming] var batchDuration: Duration = null
+ private[streaming] var rememberDuration: Duration = null
private[streaming] var checkpointInProgress = false
private[streaming] def start(time: Time) {
@@ -41,7 +41,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
- private[streaming] def setBatchDuration(duration: Time) {
+ private[streaming] def setBatchDuration(duration: Duration) {
this.synchronized {
if (batchDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
@@ -51,7 +51,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
- private[streaming] def remember(duration: Time) {
+ private[streaming] def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala
new file mode 100644
index 0000000000..d2728d9dca
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/Duration.scala
@@ -0,0 +1,62 @@
+package spark.streaming
+
+class Duration (private val millis: Long) {
+
+ def < (that: Duration): Boolean = (this.millis < that.millis)
+
+ def <= (that: Duration): Boolean = (this.millis <= that.millis)
+
+ def > (that: Duration): Boolean = (this.millis > that.millis)
+
+ def >= (that: Duration): Boolean = (this.millis >= that.millis)
+
+ def + (that: Duration): Duration = new Duration(millis + that.millis)
+
+ def - (that: Duration): Duration = new Duration(millis - that.millis)
+
+ def * (times: Int): Duration = new Duration(millis * times)
+
+ def / (that: Duration): Long = millis / that.millis
+
+ def isMultipleOf(that: Duration): Boolean =
+ (this.millis % that.millis == 0)
+
+ def min(that: Duration): Duration = if (this < that) this else that
+
+ def max(that: Duration): Duration = if (this > that) this else that
+
+ def isZero: Boolean = (this.millis == 0)
+
+ override def toString: String = (millis.toString + " ms")
+
+ def toFormattedString: String = millis.toString
+
+ def milliseconds: Long = millis
+}
+
+
+/**
+ * Helper object that creates instance of [[spark.streaming.Duration]] representing
+ * a given number of milliseconds.
+ */
+object Milliseconds {
+ def apply(milliseconds: Long) = new Duration(milliseconds)
+}
+
+/**
+ * Helper object that creates instance of [[spark.streaming.Duration]] representing
+ * a given number of seconds.
+ */
+object Seconds {
+ def apply(seconds: Long) = new Duration(seconds * 1000)
+}
+
+/**
+ * Helper object that creates instance of [[spark.streaming.Duration]] representing
+ * a given number of minutes.
+ */
+object Minutes {
+ def apply(minutes: Long) = new Duration(minutes * 60000)
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index fa0b7ce19d..dc21dfb722 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -1,16 +1,16 @@
package spark.streaming
private[streaming]
-case class Interval(beginTime: Time, endTime: Time) {
- def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs))
+class Interval(val beginTime: Time, val endTime: Time) {
+ def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs))
- def duration(): Time = endTime - beginTime
+ def duration(): Duration = endTime - beginTime
- def + (time: Time): Interval = {
+ def + (time: Duration): Interval = {
new Interval(beginTime + time, endTime + time)
}
- def - (time: Time): Interval = {
+ def - (time: Duration): Interval = {
new Interval(beginTime - time, endTime - time)
}
@@ -27,24 +27,14 @@ case class Interval(beginTime: Time, endTime: Time) {
def >= (that: Interval) = !(this < that)
- def next(): Interval = {
- this + (endTime - beginTime)
- }
-
- def isZero = (beginTime.isZero && endTime.isZero)
-
- def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString
-
- override def toString = "[" + beginTime + ", " + endTime + "]"
+ override def toString = "[" + beginTime + ", " + endTime + "]"
}
object Interval {
- def zero() = new Interval (Time.zero, Time.zero)
-
- def currentInterval(intervalDuration: Time): Interval = {
- val time = Time(System.currentTimeMillis)
- val intervalBegin = time.floor(intervalDuration)
- Interval(intervalBegin, intervalBegin + intervalDuration)
+ def currentInterval(duration: Duration): Interval = {
+ val time = new Time(System.currentTimeMillis)
+ val intervalBegin = time.floor(duration)
+ new Interval(intervalBegin, intervalBegin + duration)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index b0a208e67f..dd64064138 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -69,21 +69,21 @@ extends Serializable {
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = {
+ def groupByKeyAndWindow(windowTime: Duration, slideTime: Duration): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner())
}
def groupByKeyAndWindow(
- windowTime: Time,
- slideTime: Time,
+ windowTime: Duration,
+ slideTime: Duration,
numPartitions: Int
): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions))
}
def groupByKeyAndWindow(
- windowTime: Time,
- slideTime: Time,
+ windowTime: Duration,
+ slideTime: Duration,
partitioner: Partitioner
): DStream[(K, Seq[V])] = {
self.window(windowTime, slideTime).groupByKey(partitioner)
@@ -91,23 +91,23 @@ extends Serializable {
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
- windowTime: Time
+ windowTime: Duration
): DStream[(K, V)] = {
reduceByKeyAndWindow(reduceFunc, windowTime, self.slideTime, defaultPartitioner())
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
- windowTime: Time,
- slideTime: Time
+ windowTime: Duration,
+ slideTime: Duration
): DStream[(K, V)] = {
reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner())
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
- windowTime: Time,
- slideTime: Time,
+ windowTime: Duration,
+ slideTime: Duration,
numPartitions: Int
): DStream[(K, V)] = {
reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
@@ -115,8 +115,8 @@ extends Serializable {
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
- windowTime: Time,
- slideTime: Time,
+ windowTime: Duration,
+ slideTime: Duration,
partitioner: Partitioner
): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
@@ -134,8 +134,8 @@ extends Serializable {
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
- windowTime: Time,
- slideTime: Time
+ windowTime: Duration,
+ slideTime: Duration
): DStream[(K, V)] = {
reduceByKeyAndWindow(
@@ -145,8 +145,8 @@ extends Serializable {
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
- windowTime: Time,
- slideTime: Time,
+ windowTime: Duration,
+ slideTime: Duration,
numPartitions: Int
): DStream[(K, V)] = {
@@ -157,8 +157,8 @@ extends Serializable {
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
- windowTime: Time,
- slideTime: Time,
+ windowTime: Duration,
+ slideTime: Duration,
partitioner: Partitioner
): DStream[(K, V)] = {
@@ -169,8 +169,8 @@ extends Serializable {
}
def countByKeyAndWindow(
- windowTime: Time,
- slideTime: Time,
+ windowTime: Duration,
+ slideTime: Duration,
numPartitions: Int = self.ssc.sc.defaultParallelism
): DStream[(K, Long)] = {
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index eb40affe6d..10845e3a5e 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -22,7 +22,8 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_))
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
+ longTime => generateRDDs(new Time(longTime)))
def start() {
// If context was started from checkpoint, then restart timer such that
@@ -41,7 +42,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
timer.restart(graph.zeroTime.milliseconds)
logInfo("Scheduler's timer restarted")
} else {
- val firstTime = Time(timer.start())
+ val firstTime = new Time(timer.start())
graph.start(firstTime - ssc.graph.batchDuration)
logInfo("Scheduler's timer started")
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 215246ba2e..ee8314df3f 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -26,7 +26,7 @@ import java.util.UUID
class StreamingContext private (
sc_ : SparkContext,
cp_ : Checkpoint,
- batchDur_ : Time
+ batchDur_ : Duration
) extends Logging {
/**
@@ -34,7 +34,7 @@ class StreamingContext private (
* @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration)
+ def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
/**
* Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
@@ -42,7 +42,7 @@ class StreamingContext private (
* @param frameworkName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(master: String, frameworkName: String, batchDuration: Time) =
+ def this(master: String, frameworkName: String, batchDuration: Duration) =
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
@@ -96,7 +96,7 @@ class StreamingContext private (
}
}
- protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
+ protected[streaming] var checkpointInterval: Duration = if (isCheckpointPresent) cp_.checkpointInterval else null
protected[streaming] var receiverJobThread: Thread = null
protected[streaming] var scheduler: Scheduler = null
@@ -107,7 +107,7 @@ class StreamingContext private (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
*/
- def remember(duration: Time) {
+ def remember(duration: Duration) {
graph.remember(duration)
}
@@ -117,7 +117,7 @@ class StreamingContext private (
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Time = null) {
+ def checkpoint(directory: String, interval: Duration = null) {
if (directory != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
checkpointDir = directory
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 3c6fd5d967..069df82e52 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -7,7 +7,7 @@ package spark.streaming
* @param millis Time in UTC.
*/
-case class Time(private val millis: Long) {
+class Time(private val millis: Long) {
def < (that: Time): Boolean = (this.millis < that.millis)
@@ -17,63 +17,32 @@ case class Time(private val millis: Long) {
def >= (that: Time): Boolean = (this.millis >= that.millis)
- def + (that: Time): Time = Time(millis + that.millis)
-
- def - (that: Time): Time = Time(millis - that.millis)
-
- def * (times: Int): Time = Time(millis * times)
+ def + (that: Duration): Time = new Time(millis + that.milliseconds)
+
+ def - (that: Time): Duration = new Duration(millis - that.millis)
- def / (that: Time): Long = millis / that.millis
+ def - (that: Duration): Time = new Time(millis - that.milliseconds)
- def floor(that: Time): Time = {
- val t = that.millis
+ def floor(that: Duration): Time = {
+ val t = that.milliseconds
val m = math.floor(this.millis / t).toLong
- Time(m * t)
+ new Time(m * t)
}
- def isMultipleOf(that: Time): Boolean =
- (this.millis % that.millis == 0)
+ def isMultipleOf(that: Duration): Boolean =
+ (this.millis % that.milliseconds == 0)
def min(that: Time): Time = if (this < that) this else that
def max(that: Time): Time = if (this > that) this else that
- def isZero: Boolean = (this.millis == 0)
-
override def toString: String = (millis.toString + " ms")
- def toFormattedString: String = millis.toString
-
def milliseconds: Long = millis
}
-private[streaming] object Time {
- val zero = Time(0)
-
+/*private[streaming] object Time {
implicit def toTime(long: Long) = Time(long)
}
-
-/**
- * Helper object that creates instance of [[spark.streaming.Time]] representing
- * a given number of milliseconds.
- */
-object Milliseconds {
- def apply(milliseconds: Long) = Time(milliseconds)
-}
-
-/**
- * Helper object that creates instance of [[spark.streaming.Time]] representing
- * a given number of seconds.
- */
-object Seconds {
- def apply(seconds: Long) = Time(seconds * 1000)
-}
-
-/**
- * Helper object that creates instance of [[spark.streaming.Time]] representing
- * a given number of minutes.
- */
-object Minutes {
- def apply(minutes: Long) = Time(minutes * 60000)
-}
+*/
diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index bc23d423d3..ca178fd384 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -2,7 +2,7 @@ package spark.streaming.dstream
import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
-import spark.streaming.{Time, DStream}
+import spark.streaming.{Time, DStream, Duration}
private[streaming]
class CoGroupedDStream[K : ClassManifest](
@@ -24,7 +24,7 @@ class CoGroupedDStream[K : ClassManifest](
override def dependencies = parents.toList
- override def slideTime = parents.head.slideTime
+ override def slideTime: Duration = parents.head.slideTime
override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
val part = partitioner
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
index 1cbb4d536e..76b9e58029 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
import spark.RDD
private[streaming]
@@ -11,7 +11,7 @@ class FilteredDStream[T: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
parent.getOrCompute(validTime).map(_.filter(filterFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
index 11ed8cf317..28e9a456ac 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import spark.SparkContext._
@@ -12,7 +12,7 @@ class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
index a13b4c9ff9..ef305b66f1 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
import spark.RDD
private[streaming]
@@ -11,7 +11,7 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
index 41c629a225..f8af0a38a7 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
@@ -1,7 +1,7 @@
package spark.streaming.dstream
import spark.RDD
-import spark.streaming.{DStream, Job, Time}
+import spark.streaming.{Duration, DStream, Job, Time}
private[streaming]
class ForEachDStream[T: ClassManifest] (
@@ -11,7 +11,7 @@ class ForEachDStream[T: ClassManifest] (
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[Unit]] = None
diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
index 92ea503cae..19cccea735 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
import spark.RDD
private[streaming]
@@ -9,7 +9,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T])
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[Array[T]]] = {
parent.getOrCompute(validTime).map(_.glom())
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
index 4959c66b06..50f0f45796 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -1,13 +1,13 @@
package spark.streaming.dstream
-import spark.streaming.{StreamingContext, DStream}
+import spark.streaming.{Duration, StreamingContext, DStream}
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
override def dependencies = List()
- override def slideTime = {
+ override def slideTime: Duration = {
if (ssc == null) throw new Exception("ssc is null")
if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
ssc.graph.batchDuration
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
index daf78c6893..e9ca668aa6 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
import spark.RDD
private[streaming]
@@ -12,7 +12,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
index 689caeef0e..ebc7d0698b 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import spark.SparkContext._
@@ -12,7 +12,7 @@ class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
index 786b9966f2..3af8e7ab88 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
import spark.RDD
private[streaming]
@@ -11,7 +11,7 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] (
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index d289ed2a3f..a685a778ce 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -9,15 +9,15 @@ import spark.SparkContext._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
-import spark.streaming.{Interval, Time, DStream}
+import spark.streaming.{Duration, Interval, Time, DStream}
private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
- _windowTime: Time,
- _slideTime: Time,
+ _windowTime: Duration,
+ _slideTime: Duration,
partitioner: Partitioner
) extends DStream[(K,V)](parent.ssc) {
@@ -39,15 +39,15 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
super.persist(StorageLevel.MEMORY_ONLY_SER)
reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
- def windowTime: Time = _windowTime
+ def windowTime: Duration = _windowTime
override def dependencies = List(reducedStream)
- override def slideTime: Time = _slideTime
+ override def slideTime: Duration = _slideTime
override val mustCheckpoint = true
- override def parentRememberDuration: Time = rememberDuration + windowTime
+ override def parentRememberDuration: Duration = rememberDuration + windowTime
override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
super.persist(storageLevel)
@@ -55,7 +55,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
this
}
- override def checkpoint(interval: Time): DStream[(K, V)] = {
+ override def checkpoint(interval: Duration): DStream[(K, V)] = {
super.checkpoint(interval)
//reducedStream.checkpoint(interval)
this
@@ -66,7 +66,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val invReduceF = invReduceFunc
val currentTime = validTime
- val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime)
+ val currentWindow = new Interval(currentTime - windowTime + parent.slideTime, currentTime)
val previousWindow = currentWindow - slideTime
logDebug("Window time = " + windowTime)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
index 6854bbe665..7612804b96 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
@@ -2,7 +2,7 @@ package spark.streaming.dstream
import spark.{RDD, Partitioner}
import spark.SparkContext._
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
private[streaming]
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
@@ -15,7 +15,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index 175b3060c1..ce4f486825 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -4,7 +4,7 @@ import spark.RDD
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
-import spark.streaming.{Time, DStream}
+import spark.streaming.{Duration, Time, DStream}
private[streaming]
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
@@ -18,7 +18,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
override def dependencies = List(parent)
- override def slideTime = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override val mustCheckpoint = true
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
index 0337579514..5a2c5bc0f0 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
@@ -1,7 +1,7 @@
package spark.streaming.dstream
import spark.RDD
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
private[streaming]
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
@@ -11,7 +11,7 @@ class TransformedDStream[T: ClassManifest, U: ClassManifest] (
override def dependencies = List(parent)
- override def slideTime: Time = parent.slideTime
+ override def slideTime: Duration = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(transformFunc(_, validTime))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
index 3bf4c2ecea..224a19842b 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.streaming.{DStream, Time}
+import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import collection.mutable.ArrayBuffer
import spark.rdd.UnionRDD
@@ -23,7 +23,7 @@ class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
override def dependencies = parents.toList
- override def slideTime: Time = parents.head.slideTime
+ override def slideTime: Duration = parents.head.slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
index 7718794cbf..45689b25ce 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
@@ -3,13 +3,13 @@ package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import spark.storage.StorageLevel
-import spark.streaming.{Interval, Time, DStream}
+import spark.streaming.{Duration, Interval, Time, DStream}
private[streaming]
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
- _windowTime: Time,
- _slideTime: Time)
+ _windowTime: Duration,
+ _slideTime: Duration)
extends DStream[T](parent.ssc) {
if (!_windowTime.isMultipleOf(parent.slideTime))
@@ -22,16 +22,16 @@ class WindowedDStream[T: ClassManifest](
parent.persist(StorageLevel.MEMORY_ONLY_SER)
- def windowTime: Time = _windowTime
+ def windowTime: Duration = _windowTime
override def dependencies = List(parent)
- override def slideTime: Time = _slideTime
+ override def slideTime: Duration = _slideTime
- override def parentRememberDuration: Time = rememberDuration + windowTime
+ override def parentRememberDuration: Duration = rememberDuration + windowTime
override def compute(validTime: Time): Option[RDD[T]] = {
- val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime)
+ val currentWindow = new Interval(validTime - windowTime + parent.slideTime, validTime)
Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
}
}