aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-21 10:41:25 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-21 10:41:25 -0700
commitc4a2b6f636040bacd3d4b443e65cc33dafd0aa7e (patch)
tree56fe995533a1da113f2c9e54acbb25fd73c00ee6 /streaming
parent6d5eb4b40ccad150c967fee8557a4e5d5664b4bd (diff)
downloadspark-c4a2b6f636040bacd3d4b443e65cc33dafd0aa7e.tar.gz
spark-c4a2b6f636040bacd3d4b443e65cc33dafd0aa7e.tar.bz2
spark-c4a2b6f636040bacd3d4b443e65cc33dafd0aa7e.zip
Fixed some bugs in tests for forgetting RDDs, and made sure that use of manual clock leads to a zeroTime of 0 in the DStreams (more intuitive).
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala17
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala83
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala25
10 files changed, 100 insertions, 89 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 23fd0f2434..ebff9bdb51 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -33,7 +33,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) ext
if (fs.exists(path)) {
val bkPath = new Path(path.getParent, path.getName + ".bk")
FileUtil.copy(fs, path, fs, bkPath, true, true, conf)
- println("Moved existing checkpoint file to " + bkPath)
+ //logInfo("Moved existing checkpoint file to " + bkPath)
}
val fos = fs.create(path)
val oos = new ObjectOutputStream(fos)
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 645636b603..f6cd135e59 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -30,7 +30,7 @@ extends Serializable with Logging {
// List of parent DStreams on which this DStream depends on
def dependencies: List[DStream[_]]
- // Key method that computes RDD for a valid time
+ // Key method that computes RDD for a valid time
def compute (validTime: Time): Option[RDD[T]]
/**
@@ -45,8 +45,8 @@ extends Serializable with Logging {
// Time zero for the DStream
protected var zeroTime: Time = null
- // Time after which RDDs will be forgotten
- protected var forgetTime: Time = null
+ // Duration for which the DStream will remember each RDD created
+ protected var rememberDuration: Time = null
// Storage level of the RDDs in the stream
protected var storageLevel: StorageLevel = StorageLevel.NONE
@@ -60,8 +60,8 @@ extends Serializable with Logging {
def isInitialized = (zeroTime != null)
- // Time gap for forgetting old RDDs (i.e. removing them from generatedRDDs)
- def parentForgetTime = forgetTime
+ // Duration for which the DStream requires its parent DStream to remember each RDD created
+ def parentRememberDuration = rememberDuration
// Change this RDD's storage level
def persist(
@@ -118,23 +118,24 @@ extends Serializable with Logging {
dependencies.foreach(_.setGraph(graph))
}
- protected[streaming] def setForgetTime(time: Time = slideTime) {
- if (time == null) {
- throw new Exception("Time gap for forgetting RDDs cannot be set to null for " + this)
- } else if (forgetTime != null && time < forgetTime) {
- throw new Exception("Time gap for forgetting RDDs cannot be reduced from " + forgetTime
- + " to " + time + " for " + this)
+ protected[streaming] def setRememberDuration(duration: Time = slideTime) {
+ if (duration == null) {
+ throw new Exception("Duration for remembering RDDs cannot be set to null for " + this)
+ } else if (rememberDuration != null && duration < rememberDuration) {
+ logWarning("Duration for remembering RDDs cannot be reduced from " + rememberDuration
+ + " to " + duration + " for " + this)
+ } else {
+ rememberDuration = duration
+ dependencies.foreach(_.setRememberDuration(parentRememberDuration))
+ logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
}
- forgetTime = time
- dependencies.foreach(_.setForgetTime(parentForgetTime))
- logInfo("Time gap for forgetting RDDs set to " + forgetTime + " for " + this)
}
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
protected def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
- } else if (time < zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
+ } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
false
} else {
true
@@ -143,7 +144,7 @@ extends Serializable with Logging {
/**
* This method either retrieves a precomputed RDD of this DStream,
- * or computes the RDD (if the time is valid)
+ * or computes the RDD (if the time is valid)
*/
def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
@@ -154,7 +155,7 @@ extends Serializable with Logging {
// probably all RDDs in this DStream will be reused and hence should be cached
case Some(oldRDD) => Some(oldRDD)
- // if RDD was not generated, and if the time is valid
+ // if RDD was not generated, and if the time is valid
// (based on sliding time of this DStream), then generate the RDD
case None => {
if (isTimeValid(time)) {
@@ -199,9 +200,8 @@ extends Serializable with Logging {
def forgetOldRDDs(time: Time) {
val keys = generatedRDDs.keys
var numForgotten = 0
-
keys.foreach(t => {
- if (t < (time - forgetTime)) {
+ if (t <= (time - rememberDuration)) {
generatedRDDs.remove(t)
numForgotten += 1
//logInfo("Forgot RDD of time " + t + " from " + this)
@@ -530,7 +530,7 @@ class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
val rdds = new ArrayBuffer[RDD[T]]()
parents.map(_.getOrCompute(validTime)).foreach(_ match {
case Some(rdd) => rdds += rdd
- case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
})
if (rdds.size > 0) {
Some(new UnionRDD(ssc.sc, rdds))
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index 964c8a26a0..ac44d7a2a6 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -12,6 +12,7 @@ final class DStreamGraph extends Serializable with Logging {
private[streaming] var zeroTime: Time = null
private[streaming] var batchDuration: Time = null
+ private[streaming] var rememberDuration: Time = null
private[streaming] var checkpointInProgress = false
def start(time: Time) {
@@ -21,7 +22,11 @@ final class DStreamGraph extends Serializable with Logging {
}
zeroTime = time
outputStreams.foreach(_.initialize(zeroTime))
- outputStreams.foreach(_.setForgetTime())
+ outputStreams.foreach(_.setRememberDuration()) // first set the rememberDuration to default values
+ if (rememberDuration != null) {
+ // if custom rememberDuration has been provided, set the rememberDuration
+ outputStreams.foreach(_.setRememberDuration(rememberDuration))
+ }
inputStreams.par.foreach(_.start())
}
}
@@ -48,6 +53,16 @@ final class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
+ def setRememberDuration(duration: Time) {
+ this.synchronized {
+ if (rememberDuration != null) {
+ throw new Exception("Batch duration already set as " + batchDuration +
+ ". cannot set it again.")
+ }
+ }
+ rememberDuration = duration
+ }
+
def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index f3e95c9e2b..fcf57aced7 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -38,7 +38,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
override def slideTime: Time = _slideTime
//TODO: This is wrong. This should depend on the checkpointInterval
- override def parentForgetTime: Time = forgetTime + windowTime
+ override def parentRememberDuration: Time = rememberDuration + windowTime
override def persist(
storageLevel: StorageLevel,
@@ -49,10 +49,10 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
this
}
- protected[streaming] override def setForgetTime(time: Time) {
- if (forgetTime == null || forgetTime < time) {
- forgetTime = time
- dependencies.foreach(_.setForgetTime(forgetTime + windowTime))
+ protected[streaming] override def setRememberDuration(time: Time) {
+ if (rememberDuration == null || rememberDuration < time) {
+ rememberDuration = time
+ dependencies.foreach(_.setRememberDuration(rememberDuration + windowTime))
}
}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 99e30b6110..7d52e2eddf 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -37,8 +37,8 @@ extends Logging {
timer.restart(graph.zeroTime.milliseconds)
logInfo("Scheduler's timer restarted")
} else {
- val zeroTime = Time(timer.start())
- graph.start(zeroTime)
+ val firstTime = Time(timer.start())
+ graph.start(firstTime - ssc.graph.batchDuration)
logInfo("Scheduler's timer started")
}
logInfo("Scheduler started")
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index b5f4571798..7022056f7c 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -71,6 +71,10 @@ class StreamingContext (
graph.setBatchDuration(duration)
}
+ def setRememberDuration(duration: Time) {
+ graph.setRememberDuration(duration)
+ }
+
def setCheckpointDetails(file: String, interval: Time) {
checkpointFile = file
checkpointInterval = interval
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index 2984f88284..b90e22351b 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -24,7 +24,7 @@ class WindowedDStream[T: ClassManifest](
override def slideTime: Time = _slideTime
- override def parentForgetTime: Time = forgetTime + windowTime
+ override def parentRememberDuration: Time = rememberDuration + windowTime
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime)
diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
index 5dd8b675b1..28bbb152ca 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
@@ -41,7 +41,7 @@ class DStreamBasicSuite extends DStreamSuiteBase {
)
}
- test("stateful operations") {
+ test("updateStateByKey") {
val inputData =
Seq(
Seq("a"),
@@ -75,63 +75,54 @@ class DStreamBasicSuite extends DStreamSuiteBase {
testOperation(inputData, updateStateOperation, outputData, true)
}
- test("forgetting of RDDs") {
+ test("forgetting of RDDs - map and window operations") {
assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
- val input = Seq(1 to 4, 5 to 8, 9 to 12, 13 to 16, 17 to 20, 21 to 24, 25 to 28, 29 to 32)
+ val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq
+ val rememberDuration = Seconds(3)
- assert(input.size % 4 === 0, "Number of inputs should be a multiple of 4")
+ assert(input.size === 10, "Number of inputs have changed")
def operation(s: DStream[Int]): DStream[(Int, Int)] = {
s.map(x => (x % 10, 1))
.window(Seconds(2), Seconds(1))
- .reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(1))
+ .window(Seconds(4), Seconds(2))
}
val ssc = setupStreams(input, operation _)
- runStreams[(Int, Int)](ssc, input.size, input.size)
+ ssc.setRememberDuration(rememberDuration)
+ runStreams[(Int, Int)](ssc, input.size, input.size / 2)
- val reducedWindowedStream = ssc.graph.getOutputStreams().head.dependencies.head
- .asInstanceOf[ReducedWindowedDStream[Int, Int]]
- val windowedStream = reducedWindowedStream.dependencies.head.dependencies.head
- .asInstanceOf[WindowedDStream[(Int, Int)]]
- val mappedStream = windowedStream.dependencies.head
+ val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
+ val windowedStream1 = windowedStream2.dependencies.head
+ val mappedStream = windowedStream1.dependencies.head
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val finalTime = Seconds(7)
- //assert(clock.time === finalTime.milliseconds)
-
- // ReducedWindowedStream should remember the last RDD created
- assert(reducedWindowedStream.generatedRDDs.contains(finalTime))
-
- // ReducedWindowedStream should have forgotten the previous to last RDD created
- assert(!reducedWindowedStream.generatedRDDs.contains(finalTime - reducedWindowedStream.slideTime))
-
- // WindowedStream should remember the last RDD created
- assert(windowedStream.generatedRDDs.contains(finalTime))
-
- // WindowedStream should still remember the previous to last RDD created
- // as the last RDD of ReducedWindowedStream requires that RDD
- assert(windowedStream.generatedRDDs.contains(finalTime - windowedStream.slideTime))
-
- // WindowedStream should have forgotten this RDD as the last RDD of
- // ReducedWindowedStream DOES NOT require this RDD
- assert(!windowedStream.generatedRDDs.contains(finalTime - windowedStream.slideTime - reducedWindowedStream.windowTime))
-
- // MappedStream should remember the last RDD created
- assert(mappedStream.generatedRDDs.contains(finalTime))
-
- // MappedStream should still remember the previous to last RDD created
- // as the last RDD of WindowedStream requires that RDD
- assert(mappedStream.generatedRDDs.contains(finalTime - mappedStream.slideTime))
-
- // MappedStream should still remember this RDD as the last RDD of
- // ReducedWindowedStream requires that RDD (even though the last RDD of
- // WindowedStream does not need it)
- assert(mappedStream.generatedRDDs.contains(finalTime - windowedStream.windowTime))
-
- // MappedStream should have forgotten this RDD as the last RDD of
- // ReducedWindowedStream DOES NOT require this RDD
- assert(!mappedStream.generatedRDDs.contains(finalTime - mappedStream.slideTime - windowedStream.windowTime - reducedWindowedStream.windowTime))
+ assert(clock.time === Seconds(10).milliseconds)
+
+ // IDEALLY
+ // WindowedStream2 should remember till 7 seconds: 10, 8,
+ // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5
+ // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3,
+
+ // IN THIS TEST
+ // WindowedStream2 should remember till 7 seconds: 10, 8,
+ // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
+ // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
+
+ // WindowedStream2
+ assert(windowedStream2.generatedRDDs.contains(Seconds(10)))
+ assert(windowedStream2.generatedRDDs.contains(Seconds(8)))
+ assert(!windowedStream2.generatedRDDs.contains(Seconds(6)))
+
+ // WindowedStream1
+ assert(windowedStream1.generatedRDDs.contains(Seconds(10)))
+ assert(windowedStream1.generatedRDDs.contains(Seconds(4)))
+ assert(!windowedStream1.generatedRDDs.contains(Seconds(3)))
+
+ // MappedStream
+ assert(mappedStream.generatedRDDs.contains(Seconds(10)))
+ assert(mappedStream.generatedRDDs.contains(Seconds(2)))
+ assert(!mappedStream.generatedRDDs.contains(Seconds(1)))
}
}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
index 6e5a7a58bb..59fa5a6f22 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
@@ -104,6 +104,8 @@ trait DStreamSuiteBase extends FunSuite with Logging {
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")
+
+ Thread.sleep(500) // Give some time for the forgetting old RDDs to complete
} catch {
case e: Exception => e.printStackTrace(); throw e;
} finally {
diff --git a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
index 8dd18f491a..cfcab6298d 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
@@ -9,25 +9,24 @@ class DStreamWindowSuite extends DStreamSuiteBase {
override def maxWaitTimeMillis() = 20000
val largerSlideInput = Seq(
- Seq(("a", 1)), // 1st window from here
- Seq(("a", 2)),
- Seq(("a", 3)), // 2nd window from here
- Seq(("a", 4)),
- Seq(("a", 5)), // 3rd window from here
- Seq(("a", 6)),
- Seq(), // 4th window from here
+ Seq(("a", 1)),
+ Seq(("a", 2)), // 1st window from here
+ Seq(("a", 3)),
+ Seq(("a", 4)), // 2nd window from here
+ Seq(("a", 5)),
+ Seq(("a", 6)), // 3rd window from here
Seq(),
- Seq() // 5th window from here
+ Seq() // 4th window from here
)
val largerSlideOutput = Seq(
- Seq(("a", 1)),
- Seq(("a", 6)),
- Seq(("a", 14)),
- Seq(("a", 15)),
- Seq(("a", 6))
+ Seq(("a", 3)),
+ Seq(("a", 10)),
+ Seq(("a", 18)),
+ Seq(("a", 11))
)
+
val bigInput = Seq(
Seq(("a", 1)),
Seq(("a", 1), ("b", 1)),