aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-04-04 23:20:17 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-04-04 23:20:17 -0700
commitdad499f324c6a93650aecfeb8cde10a405372930 (patch)
tree31508e8552960892cdb0d230ef9460188bae970d /external
parentb6e71032d92a072b7c951e5ea641e9454b5e70ed (diff)
downloadspark-dad499f324c6a93650aecfeb8cde10a405372930.tar.gz
spark-dad499f324c6a93650aecfeb8cde10a405372930.tar.bz2
spark-dad499f324c6a93650aecfeb8cde10a405372930.zip
[SPARK-20209][SS] Execute next trigger immediately if previous batch took longer than trigger interval
## What changes were proposed in this pull request? For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval. In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock. ## How was this patch tested? Added new unit tests to comprehensively test this behavior. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17525 from tdas/SPARK-20209.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala1
1 files changed, 1 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 6391d6269c..0046ba7e43 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.util.Utils