diff options
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index 1a616a0434..c8eef833eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.util import org.apache.spark.Logging +import org.apache.spark.util.{Clock, SystemClock} private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) @@ -38,7 +39,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: * current system time. */ def getStartTime(): Long = { - (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period } /** @@ -48,7 +49,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: * more than current time. */ def getRestartTime(originalStartTime: Long): Long = { - val gap = clock.currentTime - originalStartTime + val gap = clock.getTimeMillis() - originalStartTime (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime } |