diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2017-04-04 23:20:17 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-04-04 23:20:17 -0700 |
commit | dad499f324c6a93650aecfeb8cde10a405372930 (patch) | |
tree | 31508e8552960892cdb0d230ef9460188bae970d /sql/core/src/main/scala/org/apache | |
parent | b6e71032d92a072b7c951e5ea641e9454b5e70ed (diff) | |
download | spark-dad499f324c6a93650aecfeb8cde10a405372930.tar.gz spark-dad499f324c6a93650aecfeb8cde10a405372930.tar.bz2 spark-dad499f324c6a93650aecfeb8cde10a405372930.zip |
[SPARK-20209][SS] Execute next trigger immediately if previous batch took longer than trigger interval
## What changes were proposed in this pull request?
For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval.
In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock.
## How was this patch tested?
Added new unit tests to comprehensively test this behavior.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #17525 from tdas/SPARK-20209.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index 02996ac854..d188566f82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -47,21 +47,22 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = extends TriggerExecutor with Logging { private val intervalMs = processingTime.intervalMs + require(intervalMs >= 0) - override def execute(batchRunner: () => Boolean): Unit = { + override def execute(triggerHandler: () => Boolean): Unit = { while (true) { - val batchStartTimeMs = clock.getTimeMillis() - val terminated = !batchRunner() + val triggerTimeMs = clock.getTimeMillis + val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) + val terminated = !triggerHandler() if (intervalMs > 0) { - val batchEndTimeMs = clock.getTimeMillis() - val batchElapsedTimeMs = batchEndTimeMs - batchStartTimeMs + val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs if (batchElapsedTimeMs > intervalMs) { notifyBatchFallingBehind(batchElapsedTimeMs) } if (terminated) { return } - clock.waitTillTime(nextBatchTime(batchEndTimeMs)) + clock.waitTillTime(nextTriggerTimeMs) } else { if (terminated) { return @@ -70,7 +71,7 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = } } - /** Called when a batch falls behind. Expose for test only */ + /** Called when a batch falls behind */ def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = { logWarning("Current batch is falling behind. The trigger interval is " + s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds") @@ -83,6 +84,6 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = * an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`). */ def nextBatchTime(now: Long): Long = { - now / intervalMs * intervalMs + intervalMs + if (intervalMs == 0) now else now / intervalMs * intervalMs + intervalMs } } |