From 432fb45952c587bcebf81d718188e7067572cf49 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 4 Dec 2014 03:20:41 +0100 Subject: + core: cleanup the simple trace implementation --- kamon-core/src/main/resources/reference.conf | 15 +++++ kamon-core/src/main/scala/kamon/TimeUnits.scala | 57 +++++++++++++++++ .../src/main/scala/kamon/trace/Incubator.scala | 71 ++++++++++++++++------ .../scala/kamon/trace/MetricsOnlyContext.scala | 70 +++++++++++---------- .../src/main/scala/kamon/trace/Sampler.scala | 13 ++-- .../src/main/scala/kamon/trace/TraceContext.scala | 48 +++++++-------- .../main/scala/kamon/trace/TraceExtension.scala | 31 +++++----- .../src/main/scala/kamon/trace/TraceRecorder.scala | 10 +-- .../main/scala/kamon/trace/TracingContext.scala | 70 +++++++++++---------- .../test/scala/kamon/trace/SimpleTraceSpec.scala | 14 +++-- 10 files changed, 259 insertions(+), 140 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/TimeUnits.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 78d0900e..7c4b4ecb 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -156,6 +156,21 @@ kamon { threshold = 1 second } + incubator { + # Minimum time to stay in the trace incubator before checking if the trace should not be incubated anymore. No + # checks are made at least until this period has passed. + min-incubation-time = 5 seconds + + # Time to wait between incubation checks. After min-incubation-time, a trace is checked using this interval and if + # if shouldn't be incubated anymore, the TraceInfo is collected and reported for it. + check-interval = 1 second + + # Max amount of time that a trace can be in the incubator. If this time is reached for a given trace then it will + # be reported with whatever information is available at the moment, logging a warning for each segment that remains + # open after this point. + max-incubation-time = 20 seconds + } + # If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask` # pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment # the future was created. diff --git a/kamon-core/src/main/scala/kamon/TimeUnits.scala b/kamon-core/src/main/scala/kamon/TimeUnits.scala new file mode 100644 index 00000000..44f5b4c3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TimeUnits.scala @@ -0,0 +1,57 @@ +package kamon + +/** + * Epoch time stamp in milliseconds. + */ +class MilliTimestamp(val millis: Long) extends AnyVal { + override def toString: String = String.valueOf(millis) + ".millis" +} + +object MilliTimestamp { + def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis()) +} + +/** + * Epoch time stamp in nanoseconds. + * + * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch + * timestamp in nanoseconds. + */ +class NanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoTimestamp { + def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000) +} + +/** + * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime() + */ +class RelativeNanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object RelativeNanoTimestamp { + def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime()) + def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp = + new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000) +} + +/** + * Number of nanoseconds that passed between two points in time. + */ +class NanoInterval(val nanos: Long) extends AnyVal { + def <(that: NanoInterval): Boolean = this.nanos < that.nanos + def >(that: NanoInterval): Boolean = this.nanos > that.nanos + def ==(that: NanoInterval): Boolean = this.nanos == that.nanos + def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos + def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos + + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoInterval { + def default: NanoInterval = new NanoInterval(0L) + def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos) +} diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala index d363d771..df51f411 100644 --- a/kamon-core/src/main/scala/kamon/trace/Incubator.scala +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -1,32 +1,70 @@ package kamon.trace -import akka.actor.{ Props, Actor, ActorRef } +import java.util.concurrent.TimeUnit + +import akka.actor.{ ActorLogging, Props, Actor, ActorRef } +import kamon.{ NanoInterval, RelativeNanoTimestamp } import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.duration._ -class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor { +class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging { import context.dispatcher - val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces) - var incubating = Queue.empty[IncubatingTrace] + val config = context.system.settings.config.getConfig("kamon.trace.incubator") + + val minIncubationTime = new NanoInterval(config.getDuration("min-incubation-time", TimeUnit.NANOSECONDS)) + val maxIncubationTime = new NanoInterval(config.getDuration("max-incubation-time", TimeUnit.NANOSECONDS)) + val checkInterval = config.getDuration("check-interval", TimeUnit.MILLISECONDS) + + val checkSchedule = context.system.scheduler.schedule(checkInterval.millis, checkInterval.millis, self, CheckForCompletedTraces) + var waitingForMinimumIncubation = Queue.empty[IncubatingTrace] + var waitingForIncubationFinish = List.empty[IncubatingTrace] def receive = { - case CheckForCompletedTraces ⇒ dispatchCompleted() - case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc)) + case tc: TracingContext ⇒ incubate(tc) + case CheckForCompletedTraces ⇒ + checkWaitingForMinimumIncubation() + checkWaitingForIncubationFinish() } - @tailrec private def dispatchCompleted(): Unit = { - if (incubating.nonEmpty) { - val it = incubating.head - if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) { - it.tc.generateTraceInfo.map(subscriptions ! _) - incubating = incubating.tail - dispatchCompleted() + def incubate(tc: TracingContext): Unit = + waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now)) + + @tailrec private def checkWaitingForMinimumIncubation(): Unit = { + if (waitingForMinimumIncubation.nonEmpty) { + val it = waitingForMinimumIncubation.head + if (NanoInterval.since(it.incubationStart) >= minIncubationTime) { + waitingForMinimumIncubation = waitingForMinimumIncubation.tail + + if (it.tc.shouldIncubate) + waitingForIncubationFinish = it :: waitingForIncubationFinish + else + dispatchTraceInfo(it.tc) + + checkWaitingForMinimumIncubation() } } } + private def checkWaitingForIncubationFinish(): Unit = { + waitingForIncubationFinish = waitingForIncubationFinish.filter { + case IncubatingTrace(context, incubationStart) ⇒ + if (!context.shouldIncubate) { + dispatchTraceInfo(context) + false + } else { + if (NanoInterval.since(incubationStart) >= maxIncubationTime) { + log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token) + dispatchTraceInfo(context); + false + } else true + } + } + } + + def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo + override def postStop(): Unit = { super.postStop() checkSchedule.cancel() @@ -35,11 +73,8 @@ class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Ac object Incubator { - def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime)) + def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions)) case object CheckForCompletedTraces - case class IncubatingTrace(tc: TracingContext) { - private val incubationStartNanoTime = System.nanoTime() - def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime - } + case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp) } diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala index 04e61407..f478d971 100644 --- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -4,50 +4,42 @@ import java.util.concurrent.ConcurrentLinkedQueue import akka.actor.ActorSystem import akka.event.LoggingAdapter -import kamon.Kamon +import kamon.{ RelativeNanoTimestamp, NanoInterval } import kamon.metric.TraceMetrics.TraceMetricRecorder -import kamon.metric.{ MetricsExtension, TraceMetrics, Metrics } +import kamon.metric.{ MetricsExtension, TraceMetrics } import scala.annotation.tailrec -class MetricsOnlyContext( - traceName: String, - val token: String, - izOpen: Boolean, - val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, - nanoTimeztamp: Long, - log: LoggingAdapter, - metricsExtension: MetricsExtension, - val system: ActorSystem) +private[trace] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, val origin: TraceContextOrigin, + val startRelativeTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val system: ActorSystem) extends TraceContext { @volatile private var _name = traceName @volatile private var _isOpen = izOpen - @volatile protected var _elapsedNanoTime = 0L + @volatile protected var _elapsedTime = NanoInterval.default - private val _nanoTimestamp = nanoTimeztamp private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() - private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage + private val _traceLocalStorage = new TraceLocalStorage def rename(newName: String): Unit = - if (isOpen) _name = newName - else if (log.isWarningEnabled) log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) + if (isOpen) + _name = newName + else if (log.isWarningEnabled) + log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) def name: String = _name def isEmpty: Boolean = false def isOpen: Boolean = _isOpen - def nanoTimestamp: Long = _nanoTimestamp - def elapsedNanoTime: Long = _elapsedNanoTime def addMetadata(key: String, value: String): Unit = {} def finish(): Unit = { _isOpen = false - _elapsedNanoTime = System.nanoTime() - _nanoTimestamp - val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) + val traceElapsedTime = NanoInterval.since(startRelativeTimestamp) + _elapsedTime = traceElapsedTime + val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(elapsedNanoTime) + traceMetrics.elapsedTime.record(traceElapsedTime.nanos) drainFinishedSegments(traceMetrics) } } @@ -58,12 +50,12 @@ class MetricsOnlyContext( @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { val segment = _finishedSegments.poll() if (segment != null) { - metricRecorder.segmentRecorder(segment.identity).record(segment.duration) + metricRecorder.segmentRecorder(segment.identity).record(segment.duration.nanos) drainFinishedSegments(metricRecorder) } } - protected def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { + protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration)) if (isClosed) { @@ -73,27 +65,41 @@ class MetricsOnlyContext( } } + // Should only be used by the TraceLocal utilities. + def traceLocalStorage: TraceLocalStorage = _traceLocalStorage + + // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default + // will be returned. + def elapsedTime: NanoInterval = _elapsedTime + class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { - protected val segmentStartNanoTime = System.nanoTime() + private val _startTimestamp = RelativeNanoTimestamp.now @volatile private var _segmentName = segmentName - @volatile private var _elapsedNanoTime = 0L - @volatile protected var _isOpen = true + @volatile private var _elapsedTime = NanoInterval.default + @volatile private var _isOpen = true def name: String = _segmentName def isEmpty: Boolean = false def addMetadata(key: String, value: String): Unit = {} def isOpen: Boolean = _isOpen - def elapsedNanoTime: Long = _elapsedNanoTime def rename(newName: String): Unit = - if (isOpen) _segmentName = newName - else if (log.isWarningEnabled) log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) + if (isOpen) + _segmentName = newName + else if (log.isWarningEnabled) + log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) def finish: Unit = { _isOpen = false - _elapsedNanoTime = System.nanoTime() - segmentStartNanoTime + val segmentElapsedTime = NanoInterval.since(_startTimestamp) + _elapsedTime = segmentElapsedTime - finishSegment(name, category, library, elapsedNanoTime) + finishSegment(name, category, library, segmentElapsedTime) } + + // Handle with care and make sure that the segment is closed before calling this method, otherwise + // NanoInterval.default will be returned. + def elapsedTime: NanoInterval = _elapsedTime + def startTimestamp: RelativeNanoTimestamp = _startTimestamp } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala index 60a400f8..650592a5 100644 --- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -2,21 +2,22 @@ package kamon.trace import java.util.concurrent.atomic.AtomicLong +import kamon.NanoInterval import scala.concurrent.forkjoin.ThreadLocalRandom trait Sampler { def shouldTrace: Boolean - def shouldReport(traceElapsedNanoTime: Long): Boolean + def shouldReport(traceElapsedTime: NanoInterval): Boolean } object NoSampling extends Sampler { def shouldTrace: Boolean = false - def shouldReport(traceElapsedNanoTime: Long): Boolean = false + def shouldReport(traceElapsedTime: NanoInterval): Boolean = false } object SampleAll extends Sampler { def shouldTrace: Boolean = true - def shouldReport(traceElapsedNanoTime: Long): Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true } class RandomSampler(chance: Int) extends Sampler { @@ -24,7 +25,7 @@ class RandomSampler(chance: Int) extends Sampler { require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance - def shouldReport(traceElapsedNanoTime: Long): Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true } class OrderedSampler(interval: Int) extends Sampler { @@ -33,13 +34,13 @@ class OrderedSampler(interval: Int) extends Sampler { private val counter = new AtomicLong(0L) def shouldTrace: Boolean = counter.incrementAndGet() % interval == 0 // TODO: find a more efficient way to do this, protect from long overflow. - def shouldReport(traceElapsedNanoTime: Long): Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true } class ThresholdSampler(thresholdInNanoseconds: Long) extends Sampler { require(thresholdInNanoseconds > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0") def shouldTrace: Boolean = true - def shouldReport(traceElapsedNanoTime: Long): Boolean = traceElapsedNanoTime >= thresholdInNanoseconds + def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime.nanos >= thresholdInNanoseconds } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 2c0b38bf..466ceafd 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -17,59 +17,55 @@ package kamon.trace import java.io.ObjectStreamException - import akka.actor.ActorSystem -import kamon.Kamon +import kamon._ import kamon.metric._ -import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.metric.TraceMetrics.TraceMetricRecorder - -import scala.annotation.tailrec trait TraceContext { def name: String def token: String - def rename(name: String): Unit - def finish(): Unit def origin: TraceContextOrigin - def isOpen: Boolean - def isClosed: Boolean = !isOpen def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty + def isOpen: Boolean + def isClosed: Boolean = !isOpen + def system: ActorSystem + + def finish(): Unit + def rename(newName: String): Unit def startSegment(segmentName: String, category: String, library: String): Segment - def nanoTimestamp: Long def addMetadata(key: String, value: String) - - protected def elapsedNanoTime: Long + def startRelativeTimestamp: RelativeNanoTimestamp } trait Segment { def name: String - def rename(newName: String): Unit def category: String def library: String - def finish(): Unit + def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty def isOpen: Boolean def isClosed: Boolean = !isOpen - def isEmpty: Boolean - def addMetadata(key: String, value: String) - protected def elapsedNanoTime: Long + def finish(): Unit + def rename(newName: String): Unit + def addMetadata(key: String, value: String) } case object EmptyTraceContext extends TraceContext { def name: String = "empty-trace" def token: String = "" - def rename(name: String): Unit = {} - def finish(): Unit = {} def origin: TraceContextOrigin = TraceContextOrigin.Local - def isOpen: Boolean = false def isEmpty: Boolean = true + def isOpen: Boolean = false + def system: ActorSystem = sys.error("Can't obtain a ActorSystem from a EmptyTraceContext.") + + def finish(): Unit = {} + def rename(name: String): Unit = {} def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment - def nanoTimestamp: Long = 0L def addMetadata(key: String, value: String): Unit = {} - def elapsedNanoTime: Long = 0L + def startRelativeTimestamp = new RelativeNanoTimestamp(0L) case object EmptySegment extends Segment { val name: String = "empty-segment" @@ -77,15 +73,15 @@ case object EmptyTraceContext extends TraceContext { val library: String = "empty-library" def isEmpty: Boolean = true def isOpen: Boolean = false - def rename(newName: String): Unit = {} + def finish: Unit = {} + def rename(newName: String): Unit = {} def addMetadata(key: String, value: String): Unit = {} - def elapsedNanoTime: Long = 0L } } case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity -case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: Long) +case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: NanoInterval) object SegmentCategory { val HttpClient = "http-client" diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala index e5fbb15e..8bd9384a 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala @@ -19,9 +19,9 @@ package kamon.trace import akka.actor._ import akka.actor import akka.event.Logging +import kamon._ import kamon.metric.Metrics import kamon.util.GlobPathFilter -import kamon.Kamon class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.trace") @@ -44,11 +44,15 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val log = Logging(system, "TraceExtension") val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") - val incubator = system.actorOf(Incubator.props(subscriptions, 10000000000L)) + val incubator = system.actorOf(Incubator.props(subscriptions)) val metricsExtension = Kamon(Metrics)(system) - def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, nanoTimestamp: Long, system: ActorSystem): TraceContext = { - def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, metricsExtension, system) + def newTraceContext(traceName: String, token: String, origin: TraceContextOrigin, system: ActorSystem): TraceContext = + newTraceContext(traceName, token, true, origin, RelativeNanoTimestamp.now, system) + + def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, + startTimestamp: RelativeNanoTimestamp, system: ActorSystem): TraceContext = { + def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, detailLevel, origin, startTimestamp, log, metricsExtension, system) if (detailLevel == LevelOfDetail.MetricsOnly || origin == TraceContextOrigin.Remote) newMetricsOnlyContext @@ -56,20 +60,19 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { if (!sampler.shouldTrace) newMetricsOnlyContext else - new TracingContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, this, metricsExtension, system) + new TracingContext(traceName, token, true, detailLevel, origin, startTimestamp, log, metricsExtension, this, system) } } - def report(trace: TracingContext): Unit = if (sampler.shouldReport(trace.elapsedNanoTime)) { - if (trace.shouldIncubate) - incubator ! trace - else - trace.generateTraceInfo.map(subscriptions ! _) - } - def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber) def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber) + private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = + if (sampler.shouldReport(trace.elapsedTime)) + if (trace.shouldIncubate) + incubator ! trace + else + subscriptions ! trace.generateTraceInfo } object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { @@ -81,5 +84,5 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { } } -case class TraceInfo(name: String, token: String, startMilliTime: Long, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String], segments: List[SegmentInfo]) -case class SegmentInfo(name: String, category: String, library: String, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String]) \ No newline at end of file +case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index af47bf3c..703896c3 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -16,7 +16,7 @@ package kamon.trace -import kamon.Kamon +import kamon.{ MilliTimestamp, RelativeNanoTimestamp, Kamon } import scala.language.experimental.macros import java.util.concurrent.atomic.AtomicLong @@ -37,11 +37,11 @@ object TraceRecorder { def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = - Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), true, TraceContextOrigin.Local, System.nanoTime(), system) + Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), TraceContextOrigin.Local, system) - def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - val equivalentNanoTime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) - Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentNanoTime, system) + def joinRemoteTraceContext(traceName: String, traceToken: String, startTimestamp: MilliTimestamp, isOpen: Boolean, system: ActorSystem): TraceContext = { + val equivalentStartTimestamp = RelativeNanoTimestamp.relativeTo(startTimestamp) + Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentStartTimestamp, system) } def setContext(context: TraceContext): Unit = traceContextStorage.set(context) diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala index c9cbc754..6a8cb1c6 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -1,72 +1,76 @@ package kamon.trace import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.{ AtomicInteger, AtomicLongFieldUpdater } +import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorSystem import akka.event.LoggingAdapter -import kamon.Kamon -import kamon.metric.TraceMetrics.TraceMetricRecorder -import kamon.metric.{ MetricsExtension, Metrics, TraceMetrics } +import kamon.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp } +import kamon.metric.MetricsExtension -import scala.annotation.tailrec import scala.collection.concurrent.TrieMap -class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin, - nanoTimeztamp: Long, log: LoggingAdapter, traceExtension: TraceExtension, metricsExtension: MetricsExtension, system: ActorSystem) - extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, nanoTimeztamp, log, metricsExtension, system) { +private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin, + startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, traceExtension: TraceExtension, system: ActorSystem) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, startTimeztamp, log, metricsExtension, system) { - val openSegments = new AtomicInteger(0) - private val startMilliTime = System.currentTimeMillis() - private val allSegments = new ConcurrentLinkedQueue[TracingSegment]() - private val metadata = TrieMap.empty[String, String] + private val _openSegments = new AtomicInteger(0) + private val _startTimestamp = NanoTimestamp.now + private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]() + private val _metadata = TrieMap.empty[String, String] - override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) override def startSegment(segmentName: String, category: String, library: String): Segment = { - openSegments.incrementAndGet() + _openSegments.incrementAndGet() val newSegment = new TracingSegment(segmentName, category, library) - allSegments.add(newSegment) + _allSegments.add(newSegment) newSegment } override def finish(): Unit = { super.finish() - traceExtension.report(this) + traceExtension.dispatchTracingContext(this) } - override def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { - openSegments.decrementAndGet() + override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + _openSegments.decrementAndGet() super.finishSegment(segmentName, category, library, duration) } - def shouldIncubate: Boolean = isOpen || openSegments.get() > 0 + def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0 - def generateTraceInfo: Option[TraceInfo] = if (isOpen) None else { - val currentSegments = allSegments.iterator() - var segmentsInfo: List[SegmentInfo] = Nil + // Handle with care, should only be used after a trace is finished. + def generateTraceInfo: TraceInfo = { + require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.") + + val currentSegments = _allSegments.iterator() + var segmentsInfo = List.newBuilder[SegmentInfo] while (currentSegments.hasNext()) { val segment = currentSegments.next() - segment.createSegmentInfo match { - case Some(si) ⇒ segmentsInfo = si :: segmentsInfo - case None ⇒ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) - } + if (segment.isClosed) + segmentsInfo += segment.createSegmentInfo(_startTimestamp, startRelativeTimestamp) + else + log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) } - Some(TraceInfo(traceName, token, startMilliTime, nanoTimeztamp, elapsedNanoTime, metadata.toMap, segmentsInfo)) + TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result()) } class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { private val metadata = TrieMap.empty[String, String] override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) - override def finish: Unit = { - super.finish() - } + // Handle with care, should only be used after the segment has finished. + def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = { + require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.") - def createSegmentInfo: Option[SegmentInfo] = - if (isOpen) None - else Some(SegmentInfo(this.name, category, library, segmentStartNanoTime, elapsedNanoTime, metadata.toMap)) + // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both + // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. + val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) + + SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap) + } } } \ No newline at end of file diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala index a263ff7f..cda9cad7 100644 --- a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala @@ -88,13 +88,15 @@ class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with I expectNoMsg(2 seconds) secondSegment.finish() - val traceInfo = expectMsgType[TraceInfo] - Kamon(Trace)(system).unsubscribe(testActor) + within(10 seconds) { + val traceInfo = expectMsgType[TraceInfo] + Kamon(Trace)(system).unsubscribe(testActor) - traceInfo.name should be("simple-trace-without-segments") - traceInfo.segments.size should be(2) - traceInfo.segments.find(_.name == "segment-one") should be('defined) - traceInfo.segments.find(_.name == "segment-two") should be('defined) + traceInfo.name should be("simple-trace-without-segments") + traceInfo.segments.size should be(2) + traceInfo.segments.find(_.name == "segment-one") should be('defined) + traceInfo.segments.find(_.name == "segment-two") should be('defined) + } } } -- cgit v1.2.3