From 865025d199ca9430851e512e99780c065dee635e Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 21 Aug 2017 11:32:06 +0200 Subject: optimistic tick alignment for metrics ticker --- kamon-core/src/main/resources/reference.conf | 16 ++++++++++++- .../src/main/scala/kamon/ReporterRegistry.scala | 26 +++++++++++++++++----- 2 files changed, 36 insertions(+), 6 deletions(-) (limited to 'kamon-core') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index e0002194..e140db83 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -1,6 +1,7 @@ kamon { environment { + # Identifier for this service. service = "kamon-application" @@ -21,9 +22,19 @@ kamon { # through Kamon.scheduler() scheduler-pool-size = 2 + metric { + + # Interval at which metric snapshots will be collected and sent to all metric reporters. tick-interval = 60 seconds + # When optimistic tick alignment is enabled the metrics ticker will try to schedule the ticks to happen as close as + # possible to round tick-interval units. E.g. if the tick-interval is set to 60 seconds then Kamon will try to + # schedule the ticks at the beginning of each minute; if the tick-interval is set to 20 seconds then Kamon will try + # to schedule the ticks at 0, 20, and 40 seconds of each minute. The alignment is not meant to be perfect, just to + # improve the ability to correlate the timestamp reported in ticks with logs. + optimistic-tick-alignment = yes + # Thread pool size used by the metrics refresh scheduler. This pool is only used to periodically sampling # min-max-counter values. refresh-scheduler-pool-size = 2 @@ -74,6 +85,7 @@ kamon { } } + trace { # Interval at which sampled finished spans will be flushed to SpanReporters. @@ -125,6 +137,7 @@ kamon { } } + context { # Codecs are used to encode/decode Context keys when a Context must be propagated either through HTTP headers or @@ -148,6 +161,7 @@ kamon { } } + util { filters { @@ -155,7 +169,7 @@ kamon { # not. E.g. If there are no filter sections for the "jdbc-datasource" category and `accept-unmatched-categories` # is set to true, all entities for that category will be accepted, otherwise all will be rejected. # - # NOTE: Using entity filters is a commodity for modules that might potentially track thousands of unnecessary + # NOTE: Using entity fil`ters is a commodity for modules that might potentially track thousands of unnecessary # entities, but not all modules are required to use filters, check the your module's documentation to # determine whether setting up filters make sense or not. accept-unmatched = true diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index ff135f60..eedec830 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -20,11 +20,10 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import java.util.concurrent._ import com.typesafe.config.Config -import kamon.ReporterRegistry.SpanSink import kamon.metric._ import kamon.trace.Span import kamon.trace.Span.FinishedSpan -import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration} +import kamon.util.{DynamicAccess, Registration} import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -32,6 +31,7 @@ import scala.util.Try import scala.util.control.NonFatal import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap +import scala.concurrent.duration._ sealed trait Reporter { def start(): Unit @@ -59,6 +59,7 @@ trait ReporterRegistry { } object ReporterRegistry { + private[kamon] trait SpanSink { def reportSpan(finishedSpan: FinishedSpan): Unit } @@ -200,8 +201,22 @@ object ReporterRegistry { currentMetricTicker.cancel(false) metricReporterTickerSchedule.set { + val initialDelay = + if(registryConfiguration.optimisticMetricTickAlignment) { + val minimumInitialDelay = 2.seconds.toMillis + val currentTimestamp = System.currentTimeMillis() + val roundCurrentTick = Math.floor(currentTimestamp.toDouble / tickIntervalMillis.toDouble).toLong + val roundCurrentTimestamp = roundCurrentTick * tickIntervalMillis + + if(roundCurrentTimestamp - currentTimestamp >= minimumInitialDelay) + roundCurrentTimestamp - currentTimestamp + else + (roundCurrentTimestamp + tickIntervalMillis) - currentTimestamp + + } else tickIntervalMillis + registryExecutionContext.scheduleAtFixedRate( - new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS + new MetricReporterTicker(metrics, metricReporters), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS ) } } @@ -310,13 +325,14 @@ object ReporterRegistry { private def readRegistryConfiguration(config: Config): Configuration = Configuration( metricTickInterval = config.getDuration("kamon.metric.tick-interval"), + optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"), traceTickInterval = config.getDuration("kamon.trace.tick-interval"), traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), configuredReporters = config.getStringList("kamon.reporters").asScala ) - private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, - traceReporterQueueSize: Int, configuredReporters: Seq[String]) + private case class Configuration(metricTickInterval: Duration, optimisticMetricTickAlignment: Boolean, + traceTickInterval: Duration, traceReporterQueueSize: Int, configuredReporters: Seq[String]) } } -- cgit v1.2.3