aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-12-12 15:00:59 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2017-12-13 00:48:38 +0100
commit686040cb569283667b967494be4a7088ef35964c (patch)
tree95873e59bad8c859a14f55a1cc42758c87901d26
parent66cf166772712267aac922c51ded90c7dec3bdd5 (diff)
downloadKamon-686040cb569283667b967494be4a7088ef35964c.tar.gz
Kamon-686040cb569283667b967494be4a7088ef35964c.tar.bz2
Kamon-686040cb569283667b967494be4a7088ef35964c.zip
use a specialized java.time.Clock exposed through the Kamon companion object
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala14
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala17
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala25
-rw-r--r--kamon-core/src/main/scala/kamon/util/Clock.scala36
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala4
6 files changed, 71 insertions, 35 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 83a9a3d5..7ea9fa64 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -22,7 +22,7 @@ import com.typesafe.config.{Config, ConfigFactory}
import kamon.context.{Codecs, Context, Key, Storage}
import kamon.metric._
import kamon.trace._
-import kamon.util.{Filters, Registration}
+import kamon.util.{Filters, Registration, Clock}
import org.slf4j.LoggerFactory
import scala.concurrent.Future
@@ -36,10 +36,11 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
@volatile private var _environment = Environment.fromConfig(_config)
@volatile private var _filters = Filters.fromConfig(_config)
+ private val _clock = new Clock.Default()
private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler"))
private val _metrics = new MetricRegistry(_config, _scheduler)
- private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config)
- private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config)
+ private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock)
+ private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock)
private val _contextStorage = Storage.ThreadLocal()
private val _contextCodec = new Codecs(_config)
private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
@@ -163,6 +164,9 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
def filter(filterName: String, pattern: String): Boolean =
_filters.accept(filterName, pattern)
+ def clock(): Clock =
+ _clock
+
/**
* Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All
* registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config).
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 89bafa36..55733294 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -23,7 +23,7 @@ import com.typesafe.config.Config
import kamon.metric._
import kamon.trace.Span
import kamon.trace.Span.FinishedSpan
-import kamon.util.{DynamicAccess, Registration}
+import kamon.util.{Clock, DynamicAccess, Registration}
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
@@ -64,7 +64,7 @@ object ReporterRegistry {
def reportSpan(finishedSpan: FinishedSpan): Unit
}
- private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink {
+ private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink {
private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
private val reporterCounter = new AtomicLong(0L)
@@ -247,7 +247,7 @@ object ReporterRegistry {
} else tickIntervalMillis
registryExecutionContext.scheduleAtFixedRate(
- new MetricReporterTicker(metrics, metricReporters), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS
+ new MetricReporterTicker(metrics, metricReporters, clock), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS
)
}
}
@@ -319,12 +319,14 @@ object ReporterRegistry {
val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity)
}
- private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable {
+ private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry],
+ clock: Clock) extends Runnable {
+
val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker])
- var lastInstant = Instant.now()
+ var lastInstant = Instant.now(clock)
def run(): Unit = try {
- val currentInstant = Instant.now()
+ val currentInstant = Instant.now(clock)
val tickSnapshot = PeriodSnapshot(
from = lastInstant,
to = currentInstant,
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index d8d11589..545a3f64 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -57,9 +57,7 @@ sealed abstract class Span {
def finish(finishTimestampMicros: Long): Unit
- def finish(): Unit =
- finish(Clock.microTimestamp())
-
+ def finish(): Unit
}
object Span {
@@ -81,6 +79,7 @@ object Span {
override def setOperationName(name: String): Span = this
override def enableMetrics(): Span = this
override def disableMetrics(): Span = this
+ override def finish(): Unit = {}
override def finish(finishTimestampMicros: Long): Unit = {}
}
@@ -93,7 +92,7 @@ object Span {
* @param spanSink
*/
final class Local(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean) extends Span {
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span {
private var collectMetrics: Boolean = trackMetrics
private var open: Boolean = true
@@ -139,7 +138,7 @@ object Span {
}
override def mark(key: String): Span = {
- mark(Clock.microTimestamp(), key)
+ mark(clock.micros(), key)
}
override def mark(timestampMicros: Long, key: String): Span = synchronized {
@@ -188,6 +187,9 @@ object Span {
this
}
+ override def finish(): Unit =
+ finish(clock.micros())
+
override def finish(finishMicros: Long): Unit = synchronized {
if (open) {
open = false
@@ -222,8 +224,8 @@ object Span {
object Local {
def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink,
- trackMetrics: Boolean, scopeSpanMetrics: Boolean): Local =
- new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, trackMetrics, scopeSpanMetrics)
+ trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock): Local =
+ new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, trackMetrics, scopeSpanMetrics, clock)
}
@@ -241,6 +243,7 @@ object Span {
override def setOperationName(name: String): Span = this
override def enableMetrics(): Span = this
override def disableMetrics(): Span = this
+ override def finish(): Unit = {}
override def finish(finishTimestampMicros: Long): Unit = {}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index addeddf6..4950a700 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -35,7 +35,7 @@ trait Tracer {
object Tracer {
- final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config) extends Tracer {
+ final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer {
private val logger = LoggerFactory.getLogger(classOf[Tracer])
private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
@@ -47,7 +47,7 @@ object Tracer {
reconfigure(initialConfig)
override def buildSpan(operationName: String): SpanBuilder =
- new SpanBuilder(operationName, this, spanSink)
+ new SpanBuilder(operationName, this, spanSink, clock)
override def identityProvider: IdentityProvider =
this._identityProvider
@@ -85,11 +85,11 @@ object Tracer {
}
object Default {
- def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config): Default =
- new Default(metrics, spanSink, initialConfig)
+ def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default =
+ new Default(metrics, spanSink, initialConfig, clock)
}
- final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink) {
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) {
private var parentSpan: Span = _
private var initialOperationName: String = operationName
private var startTimestamp = 0L
@@ -158,7 +158,7 @@ object Tracer {
def start(): Span = {
- val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp()
+ val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else clock.micros()
val parentSpan: Option[Span] = Option(this.parentSpan)
.orElse(if(useParentFromContext) Some(Kamon.currentContext().get(Span.ContextKey)) else None)
@@ -177,7 +177,18 @@ object Tracer {
}
tracer.tracerMetrics.createdSpans.increment()
- Span.Local(spanContext, nonRemoteParent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, trackMetrics, tracer.scopeSpanMetrics)
+ Span.Local(
+ spanContext,
+ nonRemoteParent,
+ initialOperationName,
+ initialSpanTags,
+ initialMetricTags,
+ startTimestampMicros,
+ spanSink,
+ trackMetrics,
+ tracer.scopeSpanMetrics,
+ clock
+ )
}
private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =
diff --git a/kamon-core/src/main/scala/kamon/util/Clock.scala b/kamon-core/src/main/scala/kamon/util/Clock.scala
index 8765ec69..4f4561b5 100644
--- a/kamon-core/src/main/scala/kamon/util/Clock.scala
+++ b/kamon-core/src/main/scala/kamon/util/Clock.scala
@@ -15,20 +15,34 @@
package kamon.util
+import java.time.{Instant, ZoneId, Clock => JavaClock}
+
+abstract class Clock extends JavaClock {
+ def micros(): Long
+ def relativeNanos(): Long
+}
+
object Clock {
- private val startTimeMillis = System.currentTimeMillis()
- private val startNanoTime = System.nanoTime()
- private val startMicroTime = startTimeMillis * 1000L
- def microTimestamp(): Long =
- startMicroTime + ((System.nanoTime() - startNanoTime) / 1000L)
+ class Default extends Clock {
+ private val systemClock = JavaClock.systemUTC()
+ private val startTimeMillis = System.currentTimeMillis()
+ private val startNanoTime = System.nanoTime()
+ private val startMicroTime = startTimeMillis * 1000L
+
+ override def micros(): Long =
+ startMicroTime + ((System.nanoTime() - startNanoTime) / 1000L)
+
+ override def relativeNanos(): Long =
+ System.nanoTime()
- def milliTimestamp(): Long =
- System.currentTimeMillis()
+ override def instant(): Instant =
+ systemClock.instant()
- def relativeNanoTimestamp(): Long =
- System.nanoTime()
+ override def withZone(zone: ZoneId): JavaClock =
+ systemClock.withZone(zone)
- def toMicroTimestamp(nanoTime: Long): Long =
- startMicroTime + (startNanoTime - nanoTime / 1000L)
+ override def getZone: ZoneId =
+ systemClock.getZone()
+ }
} \ No newline at end of file
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
index b43b7509..6ff6a0a0 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
@@ -15,6 +15,7 @@
package kamon.testkit
+import kamon.Kamon
import kamon.trace.{Span, SpanContext}
import kamon.trace.Span.FinishedSpan
import kamon.util.Clock
@@ -34,9 +35,10 @@ object SpanInspection {
private val (realSpan, spanData) = Try {
val realSpan = span match {
case _: Span.Local => span
+ case other => sys.error(s"Only Span.Local can be inspected but got [${other.getClass.getName}]" )
}
- val spanData = invoke[Span.Local, FinishedSpan](realSpan, "toFinishedSpan", classOf[Long] -> Long.box(Clock.microTimestamp()))
+ val spanData = invoke[Span.Local, FinishedSpan](realSpan, "toFinishedSpan", classOf[Long] -> Long.box(Kamon.clock().micros()))
(realSpan, spanData)
}.getOrElse((null, null))