From 622c8d12735c1a8de3716984686e52bc33368004 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 13 Dec 2017 23:32:30 +0100 Subject: use java.time.Instant with nanoseconds precision in the Tracer --- .../src/test/scala/kamon/trace/LocalSpanSpec.scala | 22 ++++---- .../src/test/scala/kamon/trace/TracerSpec.scala | 6 ++- kamon-core/src/main/scala/kamon/trace/Span.scala | 59 ++++++++++------------ kamon-core/src/main/scala/kamon/trace/Tracer.scala | 12 +++-- kamon-core/src/main/scala/kamon/util/Clock.scala | 59 ++++++++++++++++++---- .../main/scala/kamon/testkit/SpanInspection.scala | 9 ++-- 6 files changed, 105 insertions(+), 62 deletions(-) diff --git a/kamon-core-tests/src/test/scala/kamon/trace/LocalSpanSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/LocalSpanSpec.scala index 21ad93b3..c73e0cb5 100644 --- a/kamon-core-tests/src/test/scala/kamon/trace/LocalSpanSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/trace/LocalSpanSpec.scala @@ -15,6 +15,8 @@ package kamon.trace +import java.time.Instant + import kamon.testkit.{MetricInspection, Reconfigure, TestSpanReporter} import kamon.util.Registration import kamon.Kamon @@ -31,17 +33,17 @@ class LocalSpanSpec extends WordSpec with Matchers with BeforeAndAfterAll with E "be sent to the Span reporters" in { Kamon.buildSpan("test-span") .withTag("test", "value") - .withStartTimestamp(100) + .withFrom(Instant.EPOCH.plusSeconds(1)) .disableMetrics() .enableMetrics() .start() - .finish(200) + .finish(Instant.EPOCH.plusSeconds(10)) eventually(timeout(2 seconds)) { val finishedSpan = reporter.nextSpan().value finishedSpan.operationName shouldBe("test-span") - finishedSpan.startTimestampMicros shouldBe 100 - finishedSpan.endTimestampMicros shouldBe 200 + finishedSpan.from shouldBe Instant.EPOCH.plusSeconds(1) + finishedSpan.to shouldBe Instant.EPOCH.plusSeconds(10) finishedSpan.tags should contain("test" -> TagValue.String("value")) } } @@ -52,22 +54,22 @@ class LocalSpanSpec extends WordSpec with Matchers with BeforeAndAfterAll with E .withTag("builder-boolean-tag-true", true) .withTag("builder-boolean-tag-false", false) .withTag("builder-number-tag", 42) - .withStartTimestamp(100) + .withFrom(Instant.EPOCH.plusSeconds(1)) .start() .tag("span-string-tag", "value") .tag("span-boolean-tag-true", true) .tag("span-boolean-tag-false", false) .tag("span-number-tag", 42) .mark("my-mark") - .mark(100, "my-custom-timetamp-mark") + .mark(Instant.EPOCH.plusSeconds(4), "my-custom-timetamp-mark") .setOperationName("fully-populated-span") - .finish(200) + .finish(Instant.EPOCH.plusSeconds(10)) eventually(timeout(2 seconds)) { val finishedSpan = reporter.nextSpan().value finishedSpan.operationName shouldBe ("fully-populated-span") - finishedSpan.startTimestampMicros shouldBe 100 - finishedSpan.endTimestampMicros shouldBe 200 + finishedSpan.from shouldBe Instant.EPOCH.plusSeconds(1) + finishedSpan.to shouldBe Instant.EPOCH.plusSeconds(10) finishedSpan.tags should contain allOf( "builder-string-tag" -> TagValue.String("value"), "builder-boolean-tag-true" -> TagValue.True, @@ -82,7 +84,7 @@ class LocalSpanSpec extends WordSpec with Matchers with BeforeAndAfterAll with E "my-mark", "my-custom-timetamp-mark" ) - finishedSpan.marks.find(_.key == "my-custom-timetamp-mark").value.timestampMicros should be(100) + finishedSpan.marks.find(_.key == "my-custom-timetamp-mark").value.instant should be(Instant.EPOCH.plusSeconds(4)) } } diff --git a/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala index eec5b428..a9a4ec10 100644 --- a/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala @@ -15,6 +15,8 @@ package kamon.trace +import java.time.Instant + import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.context.Context @@ -85,9 +87,9 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe } "allow overriding the start timestamp for a Span" in { - val span = tracer.buildSpan("myOperation").withStartTimestamp(100).start() + val span = tracer.buildSpan("myOperation").withFrom(Instant.EPOCH.plusMillis(321)).start() val spanData = inspect(span) - spanData.startTimestamp() shouldBe 100 + spanData.from() shouldBe Instant.EPOCH.plusMillis(321) } "preserve the same Span and Parent identifier when creating a Span with a remote parent if join-remote-parents-with-same-span-id is enabled" in { diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 545a3f64..d773d46e 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -16,6 +16,8 @@ package kamon package trace +import java.time.Instant + import kamon.ReporterRegistry.SpanSink import kamon.context.Key import kamon.metric.MeasurementUnit @@ -43,7 +45,7 @@ sealed abstract class Span { def mark(key: String): Span - def mark(timestampMicros: Long, key: String): Span + def mark(at: Instant, key: String): Span def addError(error: String): Span @@ -55,7 +57,7 @@ sealed abstract class Span { def disableMetrics(): Span - def finish(finishTimestampMicros: Long): Unit + def finish(at: Instant): Unit def finish(): Unit } @@ -73,26 +75,19 @@ object Span { override def tag(key: String, value: Boolean): Span = this override def tagMetric(key: String, value: String): Span = this override def mark(key: String): Span = this - override def mark(timestampMicros: Long, key: String): Span = this + override def mark(at: Instant, key: String): Span = this override def addError(error: String): Span = this override def addError(error: String, throwable: Throwable): Span = this override def setOperationName(name: String): Span = this override def enableMetrics(): Span = this override def disableMetrics(): Span = this override def finish(): Unit = {} - override def finish(finishTimestampMicros: Long): Unit = {} + override def finish(at: Instant): Unit = {} } - /** - * - * @param spanContext - * @param initialOperationName - * @param initialSpanTags - * @param startTimestampMicros - * @param spanSink - */ + final class Local(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span { + initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span { private var collectMetrics: Boolean = trackMetrics private var open: Boolean = true @@ -138,11 +133,11 @@ object Span { } override def mark(key: String): Span = { - mark(clock.micros(), key) + mark(clock.instant(), key) } - override def mark(timestampMicros: Long, key: String): Span = synchronized { - this.marks = Mark(timestampMicros, key) :: this.marks + override def mark(at: Instant, key: String): Span = synchronized { + this.marks = Mark(at, key) :: this.marks this } @@ -188,25 +183,25 @@ object Span { } override def finish(): Unit = - finish(clock.micros()) + finish(clock.instant()) - override def finish(finishMicros: Long): Unit = synchronized { + override def finish(to: Instant): Unit = synchronized { if (open) { open = false if(collectMetrics) - recordSpanMetrics(finishMicros) + recordSpanMetrics(to) if(sampled) - spanSink.reportSpan(toFinishedSpan(finishMicros)) + spanSink.reportSpan(toFinishedSpan(to)) } } - private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan = - Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, marks) + private def toFinishedSpan(to: Instant): Span.FinishedSpan = + Span.FinishedSpan(spanContext, operationName, from, to, spanTags, marks) - private def recordSpanMetrics(endTimestampMicros: Long): Unit = { - val elapsedTime = endTimestampMicros - startTimestampMicros + private def recordSpanMetrics(to: Instant): Unit = { + val elapsedTime = clock.nanosBetween(from, to) val isErrorText = if(hasError) TagValue.True.text else TagValue.False.text if(scopeSpanMetrics) @@ -223,9 +218,9 @@ object Span { object Local { def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, + initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock): Local = - new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, trackMetrics, scopeSpanMetrics, clock) + new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanSink, trackMetrics, scopeSpanMetrics, clock) } @@ -237,14 +232,14 @@ object Span { override def tag(key: String, value: Boolean): Span = this override def tagMetric(key: String, value: String): Span = this override def mark(key: String): Span = this - override def mark(timestampMicros: Long, key: String): Span = this + override def mark(at: Instant, key: String): Span = this override def addError(error: String): Span = this override def addError(error: String, throwable: Throwable): Span = this override def setOperationName(name: String): Span = this override def enableMetrics(): Span = this override def disableMetrics(): Span = this override def finish(): Unit = {} - override def finish(finishTimestampMicros: Long): Unit = {} + override def finish(at: Instant): Unit = {} } object Remote { @@ -273,17 +268,17 @@ object Span { object Metrics { - val ProcessingTime = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds) + val ProcessingTime = Kamon.histogram("span.processing-time", MeasurementUnit.time.nanoseconds) val SpanErrorCount = Kamon.counter("span.error-count") } - case class Mark(timestampMicros: Long, key: String) + case class Mark(instant: Instant, key: String) case class FinishedSpan( context: SpanContext, operationName: String, - startTimestampMicros: Long, - endTimestampMicros: Long, + from: Instant, + to: Instant, tags: Map[String, Span.TagValue], marks: Seq[Span.Mark] ) diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 4950a700..3e857f00 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -15,6 +15,8 @@ package kamon.trace +import java.time.Instant + import com.typesafe.config.Config import kamon.ReporterRegistry.SpanSink import kamon.Kamon @@ -92,7 +94,7 @@ object Tracer { final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) { private var parentSpan: Span = _ private var initialOperationName: String = operationName - private var startTimestamp = 0L + private var from: Instant = Instant.EPOCH private var initialSpanTags = Map.empty[String, Span.TagValue] private var initialMetricTags = Map.empty[String, String] private var useParentFromContext = true @@ -125,8 +127,8 @@ object Tracer { this } - def withStartTimestamp(microseconds: Long): SpanBuilder = { - this.startTimestamp = microseconds + def withFrom(from: Instant): SpanBuilder = { + this.from = from this } @@ -158,7 +160,7 @@ object Tracer { def start(): Span = { - val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else clock.micros() + val spanFrom = if(from == Instant.EPOCH) clock.instant() else from val parentSpan: Option[Span] = Option(this.parentSpan) .orElse(if(useParentFromContext) Some(Kamon.currentContext().get(Span.ContextKey)) else None) @@ -183,7 +185,7 @@ object Tracer { initialOperationName, initialSpanTags, initialMetricTags, - startTimestampMicros, + spanFrom, spanSink, trackMetrics, tracer.scopeSpanMetrics, diff --git a/kamon-core/src/main/scala/kamon/util/Clock.scala b/kamon-core/src/main/scala/kamon/util/Clock.scala index 4f4561b5..48a88968 100644 --- a/kamon-core/src/main/scala/kamon/util/Clock.scala +++ b/kamon-core/src/main/scala/kamon/util/Clock.scala @@ -18,26 +18,67 @@ package kamon.util import java.time.{Instant, ZoneId, Clock => JavaClock} abstract class Clock extends JavaClock { - def micros(): Long - def relativeNanos(): Long + def nanos(): Long + def nanosBetween(left: Instant, right: Instant): Long + def toInstant(nanos: Long): Instant } object Clock { + private val MillisInSecond = 1000L + private val MicrosInSecond = 1000000L + private val NanosInSecond = 1000000000L + class Default extends Clock { private val systemClock = JavaClock.systemUTC() - private val startTimeMillis = System.currentTimeMillis() - private val startNanoTime = System.nanoTime() - private val startMicroTime = startTimeMillis * 1000L + private val (startTimeMillis, startNanoTime) = { + var calibrationIterations = 1000 + var millis = System.currentTimeMillis() + var nanos = System.nanoTime() + var isCandidate = false + + while(calibrationIterations > 0) { + val currentMillis = System.currentTimeMillis() + val currentNanos = System.nanoTime() + + if(isCandidate && millis != currentMillis) { + millis = currentMillis + nanos = currentNanos + calibrationIterations = 0 + } else { + if(millis == currentMillis) { + isCandidate = true + } else { + millis = currentMillis + nanos = currentNanos + } + } + + calibrationIterations -= 1 + } - override def micros(): Long = - startMicroTime + ((System.nanoTime() - startNanoTime) / 1000L) + (millis, nanos) + } - override def relativeNanos(): Long = + private val startSecondTime = Math.floorDiv(startTimeMillis, MillisInSecond) + private val startSecondNanoOffset = Math.multiplyExact(Math.floorMod(startTimeMillis, MillisInSecond), MicrosInSecond) + + override def nanos(): Long = System.nanoTime() + override def toInstant(nanos: Long): Instant = { + val nanoOffset = nanos - startNanoTime + startSecondNanoOffset + Instant.ofEpochSecond(startSecondTime, nanoOffset) + } + override def instant(): Instant = - systemClock.instant() + toInstant(System.nanoTime()) + + override def nanosBetween(left: Instant, right: Instant): Long = { + val secsDiff = Math.subtractExact(right.getEpochSecond, left.getEpochSecond) + val totalNanos = Math.multiplyExact(secsDiff, NanosInSecond) + return Math.addExact(totalNanos, right.getNano - left.getNano) + } override def withZone(zone: ZoneId): JavaClock = systemClock.withZone(zone) diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala index 6ff6a0a0..fbfdc7c3 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala @@ -15,10 +15,11 @@ package kamon.testkit +import java.time.Instant + import kamon.Kamon import kamon.trace.{Span, SpanContext} import kamon.trace.Span.FinishedSpan -import kamon.util.Clock import scala.reflect.ClassTag import scala.util.Try @@ -38,7 +39,7 @@ object SpanInspection { case other => sys.error(s"Only Span.Local can be inspected but got [${other.getClass.getName}]" ) } - val spanData = invoke[Span.Local, FinishedSpan](realSpan, "toFinishedSpan", classOf[Long] -> Long.box(Kamon.clock().micros())) + val spanData = invoke[Span.Local, FinishedSpan](realSpan, "toFinishedSpan", classOf[Instant] -> Kamon.clock().instant()) (realSpan, spanData) }.getOrElse((null, null)) @@ -54,8 +55,8 @@ object SpanInspection { def metricTags(): Map[String, String] = getField[Span.Local, Map[String, String]](realSpan, "customMetricTags") - def startTimestamp(): Long = - getField[Span.Local, Long](realSpan, "startTimestampMicros") + def from(): Instant = + getField[Span.Local, Instant](realSpan, "from") def context(): SpanContext = spanData.context -- cgit v1.2.3