From 865025d199ca9430851e512e99780c065dee635e Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 21 Aug 2017 11:32:06 +0200 Subject: optimistic tick alignment for metrics ticker --- .../src/main/scala/kamon/ReporterRegistry.scala | 26 +++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index ff135f60..eedec830 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -20,11 +20,10 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import java.util.concurrent._ import com.typesafe.config.Config -import kamon.ReporterRegistry.SpanSink import kamon.metric._ import kamon.trace.Span import kamon.trace.Span.FinishedSpan -import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration} +import kamon.util.{DynamicAccess, Registration} import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -32,6 +31,7 @@ import scala.util.Try import scala.util.control.NonFatal import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap +import scala.concurrent.duration._ sealed trait Reporter { def start(): Unit @@ -59,6 +59,7 @@ trait ReporterRegistry { } object ReporterRegistry { + private[kamon] trait SpanSink { def reportSpan(finishedSpan: FinishedSpan): Unit } @@ -200,8 +201,22 @@ object ReporterRegistry { currentMetricTicker.cancel(false) metricReporterTickerSchedule.set { + val initialDelay = + if(registryConfiguration.optimisticMetricTickAlignment) { + val minimumInitialDelay = 2.seconds.toMillis + val currentTimestamp = System.currentTimeMillis() + val roundCurrentTick = Math.floor(currentTimestamp.toDouble / tickIntervalMillis.toDouble).toLong + val roundCurrentTimestamp = roundCurrentTick * tickIntervalMillis + + if(roundCurrentTimestamp - currentTimestamp >= minimumInitialDelay) + roundCurrentTimestamp - currentTimestamp + else + (roundCurrentTimestamp + tickIntervalMillis) - currentTimestamp + + } else tickIntervalMillis + registryExecutionContext.scheduleAtFixedRate( - new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS + new MetricReporterTicker(metrics, metricReporters), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS ) } } @@ -310,13 +325,14 @@ object ReporterRegistry { private def readRegistryConfiguration(config: Config): Configuration = Configuration( metricTickInterval = config.getDuration("kamon.metric.tick-interval"), + optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"), traceTickInterval = config.getDuration("kamon.trace.tick-interval"), traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), configuredReporters = config.getStringList("kamon.reporters").asScala ) - private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, - traceReporterQueueSize: Int, configuredReporters: Seq[String]) + private case class Configuration(metricTickInterval: Duration, optimisticMetricTickAlignment: Boolean, + traceTickInterval: Duration, traceReporterQueueSize: Int, configuredReporters: Seq[String]) } } -- cgit v1.2.3