aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-09 14:29:25 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-09 14:29:25 -0800
commit365506fb038a76ff3810957f5bc5823f5f16af40 (patch)
treed08c425e4e24b117bf8084ac700010d431380421 /streaming/src
parent156e8b47ef24cd1a54ee9f1141a91c20e26ac037 (diff)
downloadspark-365506fb038a76ff3810957f5bc5823f5f16af40.tar.gz
spark-365506fb038a76ff3810957f5bc5823f5f16af40.tar.bz2
spark-365506fb038a76ff3810957f5bc5823f5f16af40.zip
Changed variable name form ***Time to ***Duration to keep things consistent.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala90
-rw-r--r--streaming/src/main/scala/spark/streaming/Duration.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala70
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala22
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala34
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala24
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala18
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala48
26 files changed, 176 insertions, 186 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index a9c6e65d62..2f3adb39c2 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,7 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
- val checkpointInterval: Duration = ssc.checkpointInterval
+ val checkpointDuration: Duration = ssc.checkpointDuration
def validate() {
assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 7611598fde..c89fb7723e 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -47,7 +47,7 @@ abstract class DStream[T: ClassManifest] (
// =======================================================================
/** Time interval after which the DStream generates a RDD */
- def slideTime: Duration
+ def slideDuration: Duration
/** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
@@ -74,7 +74,7 @@ abstract class DStream[T: ClassManifest] (
// Checkpoint details
protected[streaming] val mustCheckpoint = false
- protected[streaming] var checkpointInterval: Duration = null
+ protected[streaming] var checkpointDuration: Duration = null
protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
// Reference to whole DStream graph
@@ -114,7 +114,7 @@ abstract class DStream[T: ClassManifest] (
"Cannot change checkpoint interval of an DStream after streaming context has started")
}
persist()
- checkpointInterval = interval
+ checkpointDuration = interval
this
}
@@ -130,16 +130,16 @@ abstract class DStream[T: ClassManifest] (
}
zeroTime = time
- // Set the checkpoint interval to be slideTime or 10 seconds, which ever is larger
- if (mustCheckpoint && checkpointInterval == null) {
- checkpointInterval = slideTime.max(Seconds(10))
- logInfo("Checkpoint interval automatically set to " + checkpointInterval)
+ // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
+ if (mustCheckpoint && checkpointDuration == null) {
+ checkpointDuration = slideDuration.max(Seconds(10))
+ logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
// Set the minimum value of the rememberDuration if not already set
- var minRememberDuration = slideTime
- if (checkpointInterval != null && minRememberDuration <= checkpointInterval) {
- minRememberDuration = checkpointInterval * 2 // times 2 just to be sure that the latest checkpoint is not forgetten
+ var minRememberDuration = slideDuration
+ if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
+ minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten
}
if (rememberDuration == null || rememberDuration < minRememberDuration) {
rememberDuration = minRememberDuration
@@ -153,37 +153,37 @@ abstract class DStream[T: ClassManifest] (
assert(rememberDuration != null, "Remember duration is set to null")
assert(
- !mustCheckpoint || checkpointInterval != null,
+ !mustCheckpoint || checkpointDuration != null,
"The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " +
" Please use DStream.checkpoint() to set the interval."
)
assert(
- checkpointInterval == null || checkpointInterval >= slideTime,
+ checkpointDuration == null || checkpointDuration >= slideDuration,
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
- checkpointInterval + " which is lower than its slide time (" + slideTime + "). " +
- "Please set it to at least " + slideTime + "."
+ checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
+ "Please set it to at least " + slideDuration + "."
)
assert(
- checkpointInterval == null || checkpointInterval.isMultipleOf(slideTime),
+ checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
- checkpointInterval + " which not a multiple of its slide time (" + slideTime + "). " +
- "Please set it to a multiple " + slideTime + "."
+ checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
+ "Please set it to a multiple " + slideDuration + "."
)
assert(
- checkpointInterval == null || storageLevel != StorageLevel.NONE,
+ checkpointDuration == null || storageLevel != StorageLevel.NONE,
"" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
"level has not been set to enable persisting. Please use DStream.persist() to set the " +
"storage level to use memory for better checkpointing performance."
)
assert(
- checkpointInterval == null || rememberDuration > checkpointInterval,
+ checkpointDuration == null || rememberDuration > checkpointDuration,
"The remember duration for " + this.getClass.getSimpleName + " has been set to " +
rememberDuration + " which is not more than the checkpoint interval (" +
- checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
+ checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
)
val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
@@ -200,9 +200,9 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.validate())
- logInfo("Slide time = " + slideTime)
+ logInfo("Slide time = " + slideDuration)
logInfo("Storage level = " + storageLevel)
- logInfo("Checkpoint interval = " + checkpointInterval)
+ logInfo("Checkpoint interval = " + checkpointDuration)
logInfo("Remember duration = " + rememberDuration)
logInfo("Initialized and validated " + this)
}
@@ -232,11 +232,11 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.remember(parentRememberDuration))
}
- /** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
+ /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */
protected def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
- } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
+ } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
false
} else {
true
@@ -266,7 +266,7 @@ abstract class DStream[T: ClassManifest] (
newRDD.persist(storageLevel)
logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
}
- if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
+ if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
}
@@ -528,21 +528,21 @@ abstract class DStream[T: ClassManifest] (
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
- * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
* @return
*/
- def window(windowTime: Duration): DStream[T] = window(windowTime, this.slideTime)
+ def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowTime duration (i.e., width) of the window;
+ * @param windowDuration duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
- * @param slideTime sliding interval of the window (i.e., the interval after which
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
- def window(windowTime: Duration, slideTime: Duration): DStream[T] = {
- new WindowedDStream(this, windowTime, slideTime)
+ def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+ new WindowedDStream(this, windowDuration, slideDuration)
}
/**
@@ -554,36 +554,36 @@ abstract class DStream[T: ClassManifest] (
/**
* Returns a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowTime and slideTime are as defined in the
- * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+ * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
*/
- def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Duration, slideTime: Duration): DStream[T] = {
- this.window(windowTime, slideTime).reduce(reduceFunc)
+ def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+ this.window(windowDuration, slideDuration).reduce(reduceFunc)
}
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
- windowTime: Duration,
- slideTime: Duration
+ windowDuration: Duration,
+ slideDuration: Duration
): DStream[T] = {
this.map(x => (1, x))
- .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1)
+ .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
}
/**
* Returns a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a window over this DStream. windowTime and slideTime are as defined in the
- * window() operation. This is equivalent to window(windowTime, slideTime).count()
+ * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowTime: Duration, slideTime: Duration): DStream[Int] = {
- this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
+ def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Int] = {
+ this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
- * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ * @param that Another DStream having the same slideDuration as this DStream.
*/
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
@@ -599,13 +599,13 @@ abstract class DStream[T: ClassManifest] (
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
- var time = toTime.floor(slideTime)
+ var time = toTime.floor(slideDuration)
while (time >= zeroTime && time >= fromTime) {
getOrCompute(time) match {
case Some(rdd) => rdds += rdd
case None => //throw new Exception("Could not get RDD for time " + time)
}
- time -= slideTime
+ time -= slideDuration
}
rdds.toSeq
}
diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala
index d2728d9dca..e4dc579a17 100644
--- a/streaming/src/main/scala/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/spark/streaming/Duration.scala
@@ -1,6 +1,6 @@
package spark.streaming
-class Duration (private val millis: Long) {
+case class Duration (private val millis: Long) {
def < (that: Duration): Boolean = (this.millis < that.millis)
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index dd64064138..482d01300d 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -21,14 +21,10 @@ extends Serializable {
def ssc = self.ssc
- def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
+ private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
}
- /* ---------------------------------- */
- /* DStream operations for key-value pairs */
- /* ---------------------------------- */
-
def groupByKey(): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner())
}
@@ -69,59 +65,59 @@ extends Serializable {
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
- def groupByKeyAndWindow(windowTime: Duration, slideTime: Duration): DStream[(K, Seq[V])] = {
- groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner())
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
def groupByKeyAndWindow(
- windowTime: Duration,
- slideTime: Duration,
+ windowDuration: Duration,
+ slideDuration: Duration,
numPartitions: Int
): DStream[(K, Seq[V])] = {
- groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions))
+ groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
def groupByKeyAndWindow(
- windowTime: Duration,
- slideTime: Duration,
+ windowDuration: Duration,
+ slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, Seq[V])] = {
- self.window(windowTime, slideTime).groupByKey(partitioner)
+ self.window(windowDuration, slideDuration).groupByKey(partitioner)
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
- windowTime: Duration
+ windowDuration: Duration
): DStream[(K, V)] = {
- reduceByKeyAndWindow(reduceFunc, windowTime, self.slideTime, defaultPartitioner())
+ reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
- windowTime: Duration,
- slideTime: Duration
+ windowDuration: Duration,
+ slideDuration: Duration
): DStream[(K, V)] = {
- reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner())
+ reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
- windowTime: Duration,
- slideTime: Duration,
+ windowDuration: Duration,
+ slideDuration: Duration,
numPartitions: Int
): DStream[(K, V)] = {
- reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
+ reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
- windowTime: Duration,
- slideTime: Duration,
+ windowDuration: Duration,
+ slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
self.reduceByKey(cleanedReduceFunc, partitioner)
- .window(windowTime, slideTime)
+ .window(windowDuration, slideDuration)
.reduceByKey(cleanedReduceFunc, partitioner)
}
@@ -134,51 +130,51 @@ extends Serializable {
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
- windowTime: Duration,
- slideTime: Duration
+ windowDuration: Duration,
+ slideDuration: Duration
): DStream[(K, V)] = {
reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner())
+ reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
- windowTime: Duration,
- slideTime: Duration,
+ windowDuration: Duration,
+ slideDuration: Duration,
numPartitions: Int
): DStream[(K, V)] = {
reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
+ reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
- windowTime: Duration,
- slideTime: Duration,
+ windowDuration: Duration,
+ slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
new ReducedWindowedDStream[K, V](
- self, cleanedReduceFunc, cleanedInvReduceFunc, windowTime, slideTime, partitioner)
+ self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
}
def countByKeyAndWindow(
- windowTime: Duration,
- slideTime: Duration,
+ windowDuration: Duration,
+ slideDuration: Duration,
numPartitions: Int = self.ssc.sc.defaultParallelism
): DStream[(K, Long)] = {
self.map(x => (x._1, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
- windowTime,
- slideTime,
+ windowDuration,
+ slideDuration,
numPartitions
)
}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 10845e3a5e..c04ed37de8 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -14,7 +14,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
- val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) {
+ val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir)
} else {
null
@@ -65,7 +65,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
}
private def doCheckpoint(time: Time) {
- if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) {
+ if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
val startTime = System.currentTimeMillis()
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ee8314df3f..14500bdcb1 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -96,7 +96,7 @@ class StreamingContext private (
}
}
- protected[streaming] var checkpointInterval: Duration = if (isCheckpointPresent) cp_.checkpointInterval else null
+ protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null
protected[streaming] var receiverJobThread: Thread = null
protected[streaming] var scheduler: Scheduler = null
@@ -121,10 +121,10 @@ class StreamingContext private (
if (directory != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
checkpointDir = directory
- checkpointInterval = interval
+ checkpointDuration = interval
} else {
checkpointDir = null
- checkpointInterval = null
+ checkpointDuration = null
}
}
@@ -327,7 +327,7 @@ class StreamingContext private (
graph.validate()
assert(
- checkpointDir == null || checkpointInterval != null,
+ checkpointDir == null || checkpointDuration != null,
"Checkpoint directory has been set, but the graph checkpointing interval has " +
"not been set. Please use StreamingContext.checkpoint() to set the interval."
)
@@ -337,8 +337,8 @@ class StreamingContext private (
* Starts the execution of the streams.
*/
def start() {
- if (checkpointDir != null && checkpointInterval == null && graph != null) {
- checkpointInterval = graph.batchDuration
+ if (checkpointDir != null && checkpointDuration == null && graph != null) {
+ checkpointDuration = graph.batchDuration
}
validate()
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 069df82e52..5daeb761dd 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -1,14 +1,15 @@
package spark.streaming
/**
- * This is a simple class that represents time. Internally, it represents time as UTC.
- * The recommended way to create instances of Time is to use helper objects
- * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]].
- * @param millis Time in UTC.
+ * This is a simple class that represents an absolute instant of time.
+ * Internally, it represents time as the difference, measured in milliseconds, between the current
+ * time and midnight, January 1, 1970 UTC. This is the same format as what is returned by
+ * System.currentTimeMillis.
*/
+case class Time(private val millis: Long) {
+
+ def milliseconds: Long = millis
-class Time(private val millis: Long) {
-
def < (that: Time): Boolean = (this.millis < that.millis)
def <= (that: Time): Boolean = (this.millis <= that.millis)
@@ -38,11 +39,4 @@ class Time(private val millis: Long) {
override def toString: String = (millis.toString + " ms")
- def milliseconds: Long = millis
-}
-
-/*private[streaming] object Time {
- implicit def toTime(long: Long) = Time(long)
-}
-*/
-
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index ca178fd384..ddb1bf6b28 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -18,13 +18,13 @@ class CoGroupedDStream[K : ClassManifest](
throw new IllegalArgumentException("Array of parents have different StreamingContexts")
}
- if (parents.map(_.slideTime).distinct.size > 1) {
+ if (parents.map(_.slideDuration).distinct.size > 1) {
throw new IllegalArgumentException("Array of parents have different slide times")
}
override def dependencies = parents.toList
- override def slideTime: Duration = parents.head.slideTime
+ override def slideDuration: Duration = parents.head.slideDuration
override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
val part = partitioner
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
index 76b9e58029..e993164f99 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
@@ -11,7 +11,7 @@ class FilteredDStream[T: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[T]] = {
parent.getOrCompute(validTime).map(_.filter(filterFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
index 28e9a456ac..cabd34f5f2 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -12,7 +12,7 @@ class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
index ef305b66f1..a69af60589 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
@@ -11,7 +11,7 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
index f8af0a38a7..ee69ea5177 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
@@ -11,7 +11,7 @@ class ForEachDStream[T: ClassManifest] (
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
index 19cccea735..b589cbd4d5 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
@@ -9,7 +9,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T])
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Array[T]]] = {
parent.getOrCompute(validTime).map(_.glom())
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
index 50f0f45796..980ca5177e 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -7,7 +7,7 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex
override def dependencies = List()
- override def slideTime: Duration = {
+ override def slideDuration: Duration = {
if (ssc == null) throw new Exception("ssc is null")
if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
ssc.graph.batchDuration
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
index e9ca668aa6..848afecfad 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -12,7 +12,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
index ebc7d0698b..6055aa6a05 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
@@ -12,7 +12,7 @@ class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
index 3af8e7ab88..20818a0cab 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
@@ -11,7 +11,7 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] (
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index a685a778ce..733d5c4a25 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -16,19 +16,19 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
- _windowTime: Duration,
- _slideTime: Duration,
+ _windowDuration: Duration,
+ _slideDuration: Duration,
partitioner: Partitioner
) extends DStream[(K,V)](parent.ssc) {
- assert(_windowTime.isMultipleOf(parent.slideTime),
- "The window duration of ReducedWindowedDStream (" + _slideTime + ") " +
- "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")"
+ assert(_windowDuration.isMultipleOf(parent.slideDuration),
+ "The window duration of ReducedWindowedDStream (" + _slideDuration + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)
- assert(_slideTime.isMultipleOf(parent.slideTime),
- "The slide duration of ReducedWindowedDStream (" + _slideTime + ") " +
- "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")"
+ assert(_slideDuration.isMultipleOf(parent.slideDuration),
+ "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)
// Reduce each batch of data using reduceByKey which will be further reduced by window
@@ -39,15 +39,15 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
super.persist(StorageLevel.MEMORY_ONLY_SER)
reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
- def windowTime: Duration = _windowTime
+ def windowDuration: Duration = _windowDuration
override def dependencies = List(reducedStream)
- override def slideTime: Duration = _slideTime
+ override def slideDuration: Duration = _slideDuration
override val mustCheckpoint = true
- override def parentRememberDuration: Duration = rememberDuration + windowTime
+ override def parentRememberDuration: Duration = rememberDuration + windowDuration
override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
super.persist(storageLevel)
@@ -66,11 +66,11 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val invReduceF = invReduceFunc
val currentTime = validTime
- val currentWindow = new Interval(currentTime - windowTime + parent.slideTime, currentTime)
- val previousWindow = currentWindow - slideTime
+ val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime)
+ val previousWindow = currentWindow - slideDuration
- logDebug("Window time = " + windowTime)
- logDebug("Slide time = " + slideTime)
+ logDebug("Window time = " + windowDuration)
+ logDebug("Slide time = " + slideDuration)
logDebug("ZeroTime = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
@@ -87,11 +87,11 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
//
// Get the RDDs of the reduced values in "old time steps"
- val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime)
+ val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
- val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime)
+ val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)
// Get the RDD of the reduced value of the previous window
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
index 7612804b96..1f9548bfb8 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
@@ -15,7 +15,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index ce4f486825..a1ec2f5454 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -18,14 +18,14 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override val mustCheckpoint = true
override def compute(validTime: Time): Option[RDD[(K, S)]] = {
// Try to get the previous state RDD
- getOrCompute(validTime - slideTime) match {
+ getOrCompute(validTime - slideDuration) match {
case Some(prevStateRDD) => { // If previous state RDD exists
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
index 5a2c5bc0f0..99660d9dee 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
@@ -11,7 +11,7 @@ class TransformedDStream[T: ClassManifest, U: ClassManifest] (
override def dependencies = List(parent)
- override def slideTime: Duration = parent.slideTime
+ override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(transformFunc(_, validTime))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
index 224a19842b..00bad5da34 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
@@ -17,13 +17,13 @@ class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
throw new IllegalArgumentException("Array of parents have different StreamingContexts")
}
- if (parents.map(_.slideTime).distinct.size > 1) {
+ if (parents.map(_.slideDuration).distinct.size > 1) {
throw new IllegalArgumentException("Array of parents have different slide times")
}
override def dependencies = parents.toList
- override def slideTime: Duration = parents.head.slideTime
+ override def slideDuration: Duration = parents.head.slideDuration
override def compute(validTime: Time): Option[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
index 45689b25ce..cbf0c88108 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
@@ -8,30 +8,30 @@ import spark.streaming.{Duration, Interval, Time, DStream}
private[streaming]
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
- _windowTime: Duration,
- _slideTime: Duration)
+ _windowDuration: Duration,
+ _slideDuration: Duration)
extends DStream[T](parent.ssc) {
- if (!_windowTime.isMultipleOf(parent.slideTime))
- throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " +
- "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
+ if (!_windowDuration.isMultipleOf(parent.slideDuration))
+ throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
- if (!_slideTime.isMultipleOf(parent.slideTime))
- throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
- "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
+ if (!_slideDuration.isMultipleOf(parent.slideDuration))
+ throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
+ "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
parent.persist(StorageLevel.MEMORY_ONLY_SER)
- def windowTime: Duration = _windowTime
+ def windowDuration: Duration = _windowDuration
override def dependencies = List(parent)
- override def slideTime: Duration = _slideTime
+ override def slideDuration: Duration = _slideDuration
- override def parentRememberDuration: Duration = rememberDuration + windowTime
+ override def parentRememberDuration: Duration = rememberDuration + windowDuration
override def compute(validTime: Time): Option[RDD[T]] = {
- val currentWindow = new Interval(validTime - windowTime + parent.slideTime, validTime)
+ val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
}
}
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index dc38ef4912..f9e03c607d 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -196,18 +196,18 @@ class BasicOperationsSuite extends TestSuiteBase {
// MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
// WindowedStream2
- assert(windowedStream2.generatedRDDs.contains(Seconds(10)))
- assert(windowedStream2.generatedRDDs.contains(Seconds(8)))
- assert(!windowedStream2.generatedRDDs.contains(Seconds(6)))
+ assert(windowedStream2.generatedRDDs.contains(Time(10000)))
+ assert(windowedStream2.generatedRDDs.contains(Time(8000)))
+ assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
// WindowedStream1
- assert(windowedStream1.generatedRDDs.contains(Seconds(10)))
- assert(windowedStream1.generatedRDDs.contains(Seconds(4)))
- assert(!windowedStream1.generatedRDDs.contains(Seconds(3)))
+ assert(windowedStream1.generatedRDDs.contains(Time(10000)))
+ assert(windowedStream1.generatedRDDs.contains(Time(4000)))
+ assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
// MappedStream
- assert(mappedStream.generatedRDDs.contains(Seconds(10)))
- assert(mappedStream.generatedRDDs.contains(Seconds(2)))
- assert(!mappedStream.generatedRDDs.contains(Seconds(1)))
+ assert(mappedStream.generatedRDDs.contains(Time(10000)))
+ assert(mappedStream.generatedRDDs.contains(Time(2000)))
+ assert(!mappedStream.generatedRDDs.contains(Time(1000)))
}
}
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 28bdd53c3c..a76f61d4ad 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -26,7 +26,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
def compute(validTime: Time): Option[RDD[T]] = {
logInfo("Computing RDD for time " + validTime)
- val index = ((validTime - zeroTime) / slideTime - 1).toInt
+ val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = if (index < input.size) input(index) else Seq[T]()
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
logInfo("Created RDD " + rdd.id + " with " + selectedInput)
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 4bc5229465..fa117cfcf0 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -207,11 +207,11 @@ class WindowOperationsSuite extends TestSuiteBase {
test("groupByKeyAndWindow") {
val input = bigInput
val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet)))
- val windowTime = Seconds(2)
- val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
+ val windowDuration = Seconds(2)
+ val slideDuration = Seconds(1)
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.groupByKeyAndWindow(windowTime, slideTime)
+ s.groupByKeyAndWindow(windowDuration, slideDuration)
.map(x => (x._1, x._2.toSet))
.persist()
}
@@ -221,21 +221,21 @@ class WindowOperationsSuite extends TestSuiteBase {
test("countByWindow") {
val input = Seq(Seq(1), Seq(1), Seq(1, 2), Seq(0), Seq(), Seq() )
val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0))
- val windowTime = Seconds(2)
- val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
- val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime)
+ val windowDuration = Seconds(2)
+ val slideDuration = Seconds(1)
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+ val operation = (s: DStream[Int]) => s.countByWindow(windowDuration, slideDuration)
testOperation(input, operation, expectedOutput, numBatches, true)
}
test("countByKeyAndWindow") {
val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20)))
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
- val windowTime = Seconds(2)
- val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
+ val windowDuration = Seconds(2)
+ val slideDuration = Seconds(1)
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt))
+ s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -247,12 +247,12 @@ class WindowOperationsSuite extends TestSuiteBase {
name: String,
input: Seq[Seq[Int]],
expectedOutput: Seq[Seq[Int]],
- windowTime: Time = Seconds(2),
- slideTime: Time = Seconds(1)
+ windowDuration: Duration = Seconds(2),
+ slideDuration: Duration = Seconds(1)
) {
test("window - " + name) {
- val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
- val operation = (s: DStream[Int]) => s.window(windowTime, slideTime)
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+ val operation = (s: DStream[Int]) => s.window(windowDuration, slideDuration)
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
@@ -261,13 +261,13 @@ class WindowOperationsSuite extends TestSuiteBase {
name: String,
input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]],
- windowTime: Time = Seconds(2),
- slideTime: Time = Seconds(1)
+ windowDuration: Duration = Seconds(2),
+ slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
- val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
+ s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist()
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -277,13 +277,13 @@ class WindowOperationsSuite extends TestSuiteBase {
name: String,
input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]],
- windowTime: Time = Seconds(2),
- slideTime: Time = Seconds(1)
+ windowDuration: Duration = Seconds(2),
+ slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindowInv - " + name) {
- val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime)
+ s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
.persist()
.checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
}