diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-12-12 15:00:59 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-12-13 00:48:38 +0100 |
commit | 686040cb569283667b967494be4a7088ef35964c (patch) | |
tree | 95873e59bad8c859a14f55a1cc42758c87901d26 /kamon-core/src/main | |
parent | 66cf166772712267aac922c51ded90c7dec3bdd5 (diff) | |
download | Kamon-686040cb569283667b967494be4a7088ef35964c.tar.gz Kamon-686040cb569283667b967494be4a7088ef35964c.tar.bz2 Kamon-686040cb569283667b967494be4a7088ef35964c.zip |
use a specialized java.time.Clock exposed through the Kamon companion object
Diffstat (limited to 'kamon-core/src/main')
-rw-r--r-- | kamon-core/src/main/scala/kamon/Kamon.scala | 10 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/ReporterRegistry.scala | 14 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 17 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Tracer.scala | 25 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/util/Clock.scala | 36 |
5 files changed, 68 insertions, 34 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 83a9a3d5..7ea9fa64 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -22,7 +22,7 @@ import com.typesafe.config.{Config, ConfigFactory} import kamon.context.{Codecs, Context, Key, Storage} import kamon.metric._ import kamon.trace._ -import kamon.util.{Filters, Registration} +import kamon.util.{Filters, Registration, Clock} import org.slf4j.LoggerFactory import scala.concurrent.Future @@ -36,10 +36,11 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { @volatile private var _environment = Environment.fromConfig(_config) @volatile private var _filters = Filters.fromConfig(_config) + private val _clock = new Clock.Default() private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler")) private val _metrics = new MetricRegistry(_config, _scheduler) - private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config) - private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config) + private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock) + private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock) private val _contextStorage = Storage.ThreadLocal() private val _contextCodec = new Codecs(_config) private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] @@ -163,6 +164,9 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { def filter(filterName: String, pattern: String): Boolean = _filters.accept(filterName, pattern) + def clock(): Clock = + _clock + /** * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config). diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 89bafa36..55733294 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -23,7 +23,7 @@ import com.typesafe.config.Config import kamon.metric._ import kamon.trace.Span import kamon.trace.Span.FinishedSpan -import kamon.util.{DynamicAccess, Registration} +import kamon.util.{Clock, DynamicAccess, Registration} import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -64,7 +64,7 @@ object ReporterRegistry { def reportSpan(finishedSpan: FinishedSpan): Unit } - private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { + private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink { private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) private val reporterCounter = new AtomicLong(0L) @@ -247,7 +247,7 @@ object ReporterRegistry { } else tickIntervalMillis registryExecutionContext.scheduleAtFixedRate( - new MetricReporterTicker(metrics, metricReporters), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS + new MetricReporterTicker(metrics, metricReporters, clock), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS ) } } @@ -319,12 +319,14 @@ object ReporterRegistry { val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) } - private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { + private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry], + clock: Clock) extends Runnable { + val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker]) - var lastInstant = Instant.now() + var lastInstant = Instant.now(clock) def run(): Unit = try { - val currentInstant = Instant.now() + val currentInstant = Instant.now(clock) val tickSnapshot = PeriodSnapshot( from = lastInstant, to = currentInstant, diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index d8d11589..545a3f64 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -57,9 +57,7 @@ sealed abstract class Span { def finish(finishTimestampMicros: Long): Unit - def finish(): Unit = - finish(Clock.microTimestamp()) - + def finish(): Unit } object Span { @@ -81,6 +79,7 @@ object Span { override def setOperationName(name: String): Span = this override def enableMetrics(): Span = this override def disableMetrics(): Span = this + override def finish(): Unit = {} override def finish(finishTimestampMicros: Long): Unit = {} } @@ -93,7 +92,7 @@ object Span { * @param spanSink */ final class Local(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean) extends Span { + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span { private var collectMetrics: Boolean = trackMetrics private var open: Boolean = true @@ -139,7 +138,7 @@ object Span { } override def mark(key: String): Span = { - mark(Clock.microTimestamp(), key) + mark(clock.micros(), key) } override def mark(timestampMicros: Long, key: String): Span = synchronized { @@ -188,6 +187,9 @@ object Span { this } + override def finish(): Unit = + finish(clock.micros()) + override def finish(finishMicros: Long): Unit = synchronized { if (open) { open = false @@ -222,8 +224,8 @@ object Span { object Local { def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, - trackMetrics: Boolean, scopeSpanMetrics: Boolean): Local = - new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, trackMetrics, scopeSpanMetrics) + trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock): Local = + new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, trackMetrics, scopeSpanMetrics, clock) } @@ -241,6 +243,7 @@ object Span { override def setOperationName(name: String): Span = this override def enableMetrics(): Span = this override def disableMetrics(): Span = this + override def finish(): Unit = {} override def finish(finishTimestampMicros: Long): Unit = {} } diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index addeddf6..4950a700 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -35,7 +35,7 @@ trait Tracer { object Tracer { - final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config) extends Tracer { + final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer { private val logger = LoggerFactory.getLogger(classOf[Tracer]) private[Tracer] val tracerMetrics = new TracerMetrics(metrics) @@ -47,7 +47,7 @@ object Tracer { reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = - new SpanBuilder(operationName, this, spanSink) + new SpanBuilder(operationName, this, spanSink, clock) override def identityProvider: IdentityProvider = this._identityProvider @@ -85,11 +85,11 @@ object Tracer { } object Default { - def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config): Default = - new Default(metrics, spanSink, initialConfig) + def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default = + new Default(metrics, spanSink, initialConfig, clock) } - final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink) { + final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) { private var parentSpan: Span = _ private var initialOperationName: String = operationName private var startTimestamp = 0L @@ -158,7 +158,7 @@ object Tracer { def start(): Span = { - val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() + val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else clock.micros() val parentSpan: Option[Span] = Option(this.parentSpan) .orElse(if(useParentFromContext) Some(Kamon.currentContext().get(Span.ContextKey)) else None) @@ -177,7 +177,18 @@ object Tracer { } tracer.tracerMetrics.createdSpans.increment() - Span.Local(spanContext, nonRemoteParent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, trackMetrics, tracer.scopeSpanMetrics) + Span.Local( + spanContext, + nonRemoteParent, + initialOperationName, + initialSpanTags, + initialMetricTags, + startTimestampMicros, + spanSink, + trackMetrics, + tracer.scopeSpanMetrics, + clock + ) } private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = diff --git a/kamon-core/src/main/scala/kamon/util/Clock.scala b/kamon-core/src/main/scala/kamon/util/Clock.scala index 8765ec69..4f4561b5 100644 --- a/kamon-core/src/main/scala/kamon/util/Clock.scala +++ b/kamon-core/src/main/scala/kamon/util/Clock.scala @@ -15,20 +15,34 @@ package kamon.util +import java.time.{Instant, ZoneId, Clock => JavaClock} + +abstract class Clock extends JavaClock { + def micros(): Long + def relativeNanos(): Long +} + object Clock { - private val startTimeMillis = System.currentTimeMillis() - private val startNanoTime = System.nanoTime() - private val startMicroTime = startTimeMillis * 1000L - def microTimestamp(): Long = - startMicroTime + ((System.nanoTime() - startNanoTime) / 1000L) + class Default extends Clock { + private val systemClock = JavaClock.systemUTC() + private val startTimeMillis = System.currentTimeMillis() + private val startNanoTime = System.nanoTime() + private val startMicroTime = startTimeMillis * 1000L + + override def micros(): Long = + startMicroTime + ((System.nanoTime() - startNanoTime) / 1000L) + + override def relativeNanos(): Long = + System.nanoTime() - def milliTimestamp(): Long = - System.currentTimeMillis() + override def instant(): Instant = + systemClock.instant() - def relativeNanoTimestamp(): Long = - System.nanoTime() + override def withZone(zone: ZoneId): JavaClock = + systemClock.withZone(zone) - def toMicroTimestamp(nanoTime: Long): Long = - startMicroTime + (startNanoTime - nanoTime / 1000L) + override def getZone: ZoneId = + systemClock.getZone() + } }
\ No newline at end of file |