aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index 1a616a0434..c8eef833eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.util
import org.apache.spark.Logging
+import org.apache.spark.util.{Clock, SystemClock}
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
@@ -38,7 +39,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
* current system time.
*/
def getStartTime(): Long = {
- (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
}
/**
@@ -48,7 +49,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
* more than current time.
*/
def getRestartTime(originalStartTime: Long): Long = {
- val gap = clock.currentTime - originalStartTime
+ val gap = clock.getTimeMillis() - originalStartTime
(math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
}