aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-09-23 01:28:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-09-23 01:28:02 -0700
commit44c28abf120754c0175c65ffd3d4587a350b3798 (patch)
treeeacddd5f023c1e42838b45b754d3a05f4e4c59ac /streaming/src/main
parent5548a254755bb84edae2768b94ab1816e1b49b91 (diff)
downloadspark-44c28abf120754c0175c65ffd3d4587a350b3798.tar.gz
spark-44c28abf120754c0175c65ffd3d4587a350b3798.tar.bz2
spark-44c28abf120754c0175c65ffd3d4587a350b3798.zip
[SPARK-10224] [STREAMING] Fix the issue that blockIntervalTimer won't call updateCurrentBuffer when stopping
`blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost. To reproduce it, you can add `Thread.sleep(200)` in this line (https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala#L100) and run `StreamingContexSuite`. I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds. There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called. Author: zsxwing <zsxwing@gmail.com> Closes #8417 from zsxwing/SPARK-10224.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala19
1 files changed, 13 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index dd32ad5ad8..0148cb51c6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -72,8 +72,10 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
/**
* Stop the timer, and return the last time the callback was made.
- * interruptTimer = true will interrupt the callback
+ * - interruptTimer = true will interrupt the callback
* if it is in progress (not guaranteed to give correct time in this case).
+ * - interruptTimer = false guarantees that there will be at least one callback after `stop` has
+ * been called.
*/
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
@@ -87,18 +89,23 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
prevTime
}
+ private def triggerActionForNextInterval(): Unit = {
+ clock.waitTillTime(nextTime)
+ callback(nextTime)
+ prevTime = nextTime
+ nextTime += period
+ logDebug("Callback for " + name + " called at time " + prevTime)
+ }
+
/**
* Repeatedly call the callback every interval.
*/
private def loop() {
try {
while (!stopped) {
- clock.waitTillTime(nextTime)
- callback(nextTime)
- prevTime = nextTime
- nextTime += period
- logDebug("Callback for " + name + " called at time " + prevTime)
+ triggerActionForNextInterval()
}
+ triggerActionForNextInterval()
} catch {
case e: InterruptedException =>
}