aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-08-21 11:32:06 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-08-21 11:58:20 +0200
commit865025d199ca9430851e512e99780c065dee635e (patch)
tree52981336e71c153806e25669654f64c48152ff2a
parenta152a3098b564ed43766a857b32b7c7d7445f9ce (diff)
downloadKamon-865025d199ca9430851e512e99780c065dee635e.tar.gz
Kamon-865025d199ca9430851e512e99780c065dee635e.tar.bz2
Kamon-865025d199ca9430851e512e99780c065dee635e.zip
optimistic tick alignment for metrics ticker
-rw-r--r--kamon-core/src/main/resources/reference.conf16
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala26
2 files changed, 36 insertions, 6 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index e0002194..e140db83 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -1,6 +1,7 @@
kamon {
environment {
+
# Identifier for this service.
service = "kamon-application"
@@ -21,9 +22,19 @@ kamon {
# through Kamon.scheduler()
scheduler-pool-size = 2
+
metric {
+
+ # Interval at which metric snapshots will be collected and sent to all metric reporters.
tick-interval = 60 seconds
+ # When optimistic tick alignment is enabled the metrics ticker will try to schedule the ticks to happen as close as
+ # possible to round tick-interval units. E.g. if the tick-interval is set to 60 seconds then Kamon will try to
+ # schedule the ticks at the beginning of each minute; if the tick-interval is set to 20 seconds then Kamon will try
+ # to schedule the ticks at 0, 20, and 40 seconds of each minute. The alignment is not meant to be perfect, just to
+ # improve the ability to correlate the timestamp reported in ticks with logs.
+ optimistic-tick-alignment = yes
+
# Thread pool size used by the metrics refresh scheduler. This pool is only used to periodically sampling
# min-max-counter values.
refresh-scheduler-pool-size = 2
@@ -74,6 +85,7 @@ kamon {
}
}
+
trace {
# Interval at which sampled finished spans will be flushed to SpanReporters.
@@ -125,6 +137,7 @@ kamon {
}
}
+
context {
# Codecs are used to encode/decode Context keys when a Context must be propagated either through HTTP headers or
@@ -148,6 +161,7 @@ kamon {
}
}
+
util {
filters {
@@ -155,7 +169,7 @@ kamon {
# not. E.g. If there are no filter sections for the "jdbc-datasource" category and `accept-unmatched-categories`
# is set to true, all entities for that category will be accepted, otherwise all will be rejected.
#
- # NOTE: Using entity filters is a commodity for modules that might potentially track thousands of unnecessary
+ # NOTE: Using entity fil`ters is a commodity for modules that might potentially track thousands of unnecessary
# entities, but not all modules are required to use filters, check the your module's documentation to
# determine whether setting up filters make sense or not.
accept-unmatched = true
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index ff135f60..eedec830 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -20,11 +20,10 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._
import com.typesafe.config.Config
-import kamon.ReporterRegistry.SpanSink
import kamon.metric._
import kamon.trace.Span
import kamon.trace.Span.FinishedSpan
-import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration}
+import kamon.util.{DynamicAccess, Registration}
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
@@ -32,6 +31,7 @@ import scala.util.Try
import scala.util.control.NonFatal
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
sealed trait Reporter {
def start(): Unit
@@ -59,6 +59,7 @@ trait ReporterRegistry {
}
object ReporterRegistry {
+
private[kamon] trait SpanSink {
def reportSpan(finishedSpan: FinishedSpan): Unit
}
@@ -200,8 +201,22 @@ object ReporterRegistry {
currentMetricTicker.cancel(false)
metricReporterTickerSchedule.set {
+ val initialDelay =
+ if(registryConfiguration.optimisticMetricTickAlignment) {
+ val minimumInitialDelay = 2.seconds.toMillis
+ val currentTimestamp = System.currentTimeMillis()
+ val roundCurrentTick = Math.floor(currentTimestamp.toDouble / tickIntervalMillis.toDouble).toLong
+ val roundCurrentTimestamp = roundCurrentTick * tickIntervalMillis
+
+ if(roundCurrentTimestamp - currentTimestamp >= minimumInitialDelay)
+ roundCurrentTimestamp - currentTimestamp
+ else
+ (roundCurrentTimestamp + tickIntervalMillis) - currentTimestamp
+
+ } else tickIntervalMillis
+
registryExecutionContext.scheduleAtFixedRate(
- new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
+ new MetricReporterTicker(metrics, metricReporters), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS
)
}
}
@@ -310,13 +325,14 @@ object ReporterRegistry {
private def readRegistryConfiguration(config: Config): Configuration =
Configuration(
metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
+ optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"),
configuredReporters = config.getStringList("kamon.reporters").asScala
)
- private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration,
- traceReporterQueueSize: Int, configuredReporters: Seq[String])
+ private case class Configuration(metricTickInterval: Duration, optimisticMetricTickAlignment: Boolean,
+ traceTickInterval: Duration, traceReporterQueueSize: Int, configuredReporters: Seq[String])
}
}