diff options
author | zsxwing <zsxwing@gmail.com> | 2015-09-23 01:28:02 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-09-23 01:28:02 -0700 |
commit | 44c28abf120754c0175c65ffd3d4587a350b3798 (patch) | |
tree | eacddd5f023c1e42838b45b754d3a05f4e4c59ac /streaming/src/main | |
parent | 5548a254755bb84edae2768b94ab1816e1b49b91 (diff) | |
download | spark-44c28abf120754c0175c65ffd3d4587a350b3798.tar.gz spark-44c28abf120754c0175c65ffd3d4587a350b3798.tar.bz2 spark-44c28abf120754c0175c65ffd3d4587a350b3798.zip |
[SPARK-10224] [STREAMING] Fix the issue that blockIntervalTimer won't call updateCurrentBuffer when stopping
`blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost.
To reproduce it, you can add `Thread.sleep(200)` in this line (https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala#L100) and run `StreamingContexSuite`.
I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds.
There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console
This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called.
Author: zsxwing <zsxwing@gmail.com>
Closes #8417 from zsxwing/SPARK-10224.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index dd32ad5ad8..0148cb51c6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -72,8 +72,10 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: /** * Stop the timer, and return the last time the callback was made. - * interruptTimer = true will interrupt the callback + * - interruptTimer = true will interrupt the callback * if it is in progress (not guaranteed to give correct time in this case). + * - interruptTimer = false guarantees that there will be at least one callback after `stop` has + * been called. */ def stop(interruptTimer: Boolean): Long = synchronized { if (!stopped) { @@ -87,18 +89,23 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: prevTime } + private def triggerActionForNextInterval(): Unit = { + clock.waitTillTime(nextTime) + callback(nextTime) + prevTime = nextTime + nextTime += period + logDebug("Callback for " + name + " called at time " + prevTime) + } + /** * Repeatedly call the callback every interval. */ private def loop() { try { while (!stopped) { - clock.waitTillTime(nextTime) - callback(nextTime) - prevTime = nextTime - nextTime += period - logDebug("Callback for " + name + " called at time " + prevTime) + triggerActionForNextInterval() } + triggerActionForNextInterval() } catch { case e: InterruptedException => } |