aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-08-21 11:32:06 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-08-21 11:58:20 +0200
commit865025d199ca9430851e512e99780c065dee635e (patch)
tree52981336e71c153806e25669654f64c48152ff2a /kamon-core/src/main/scala/kamon/ReporterRegistry.scala
parenta152a3098b564ed43766a857b32b7c7d7445f9ce (diff)
downloadKamon-865025d199ca9430851e512e99780c065dee635e.tar.gz
Kamon-865025d199ca9430851e512e99780c065dee635e.tar.bz2
Kamon-865025d199ca9430851e512e99780c065dee635e.zip
optimistic tick alignment for metrics ticker
Diffstat (limited to 'kamon-core/src/main/scala/kamon/ReporterRegistry.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala26
1 files changed, 21 insertions, 5 deletions
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index ff135f60..eedec830 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -20,11 +20,10 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._
import com.typesafe.config.Config
-import kamon.ReporterRegistry.SpanSink
import kamon.metric._
import kamon.trace.Span
import kamon.trace.Span.FinishedSpan
-import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration}
+import kamon.util.{DynamicAccess, Registration}
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
@@ -32,6 +31,7 @@ import scala.util.Try
import scala.util.control.NonFatal
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
sealed trait Reporter {
def start(): Unit
@@ -59,6 +59,7 @@ trait ReporterRegistry {
}
object ReporterRegistry {
+
private[kamon] trait SpanSink {
def reportSpan(finishedSpan: FinishedSpan): Unit
}
@@ -200,8 +201,22 @@ object ReporterRegistry {
currentMetricTicker.cancel(false)
metricReporterTickerSchedule.set {
+ val initialDelay =
+ if(registryConfiguration.optimisticMetricTickAlignment) {
+ val minimumInitialDelay = 2.seconds.toMillis
+ val currentTimestamp = System.currentTimeMillis()
+ val roundCurrentTick = Math.floor(currentTimestamp.toDouble / tickIntervalMillis.toDouble).toLong
+ val roundCurrentTimestamp = roundCurrentTick * tickIntervalMillis
+
+ if(roundCurrentTimestamp - currentTimestamp >= minimumInitialDelay)
+ roundCurrentTimestamp - currentTimestamp
+ else
+ (roundCurrentTimestamp + tickIntervalMillis) - currentTimestamp
+
+ } else tickIntervalMillis
+
registryExecutionContext.scheduleAtFixedRate(
- new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
+ new MetricReporterTicker(metrics, metricReporters), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS
)
}
}
@@ -310,13 +325,14 @@ object ReporterRegistry {
private def readRegistryConfiguration(config: Config): Configuration =
Configuration(
metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
+ optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"),
configuredReporters = config.getStringList("kamon.reporters").asScala
)
- private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration,
- traceReporterQueueSize: Int, configuredReporters: Seq[String])
+ private case class Configuration(metricTickInterval: Duration, optimisticMetricTickAlignment: Boolean,
+ traceTickInterval: Duration, traceReporterQueueSize: Int, configuredReporters: Seq[String])
}
}