diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-12-04 03:22:20 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-12-04 03:22:20 +0100 |
commit | 3b063022d5c18418f9da82482311c67d486727d4 (patch) | |
tree | 276774baff16e2ef42d62a1fd6e4362f1f26e4ae /kamon-core/src/main/scala | |
parent | d4de68cc71b66866b763dd356783e49edd893909 (diff) | |
parent | 0858ddfc94853e603975712c16f6945c01288f9a (diff) | |
download | Kamon-3b063022d5c18418f9da82482311c67d486727d4.tar.gz Kamon-3b063022d5c18418f9da82482311c67d486727d4.tar.bz2 Kamon-3b063022d5c18418f9da82482311c67d486727d4.zip |
Merge branch 'wip/simple-tracing-implementation'
Diffstat (limited to 'kamon-core/src/main/scala')
11 files changed, 493 insertions, 118 deletions
diff --git a/kamon-core/src/main/scala/kamon/TimeUnits.scala b/kamon-core/src/main/scala/kamon/TimeUnits.scala new file mode 100644 index 00000000..44f5b4c3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TimeUnits.scala @@ -0,0 +1,57 @@ +package kamon + +/** + * Epoch time stamp in milliseconds. + */ +class MilliTimestamp(val millis: Long) extends AnyVal { + override def toString: String = String.valueOf(millis) + ".millis" +} + +object MilliTimestamp { + def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis()) +} + +/** + * Epoch time stamp in nanoseconds. + * + * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch + * timestamp in nanoseconds. + */ +class NanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoTimestamp { + def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000) +} + +/** + * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime() + */ +class RelativeNanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object RelativeNanoTimestamp { + def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime()) + def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp = + new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000) +} + +/** + * Number of nanoseconds that passed between two points in time. + */ +class NanoInterval(val nanos: Long) extends AnyVal { + def <(that: NanoInterval): Boolean = this.nanos < that.nanos + def >(that: NanoInterval): Boolean = this.nanos > that.nanos + def ==(that: NanoInterval): Boolean = this.nanos == that.nanos + def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos + def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos + + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoInterval { + def default: NanoInterval = new NanoInterval(0L) + def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos) +} diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala new file mode 100644 index 00000000..df51f411 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -0,0 +1,80 @@ +package kamon.trace + +import java.util.concurrent.TimeUnit + +import akka.actor.{ ActorLogging, Props, Actor, ActorRef } +import kamon.{ NanoInterval, RelativeNanoTimestamp } +import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import scala.concurrent.duration._ + +class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging { + import context.dispatcher + val config = context.system.settings.config.getConfig("kamon.trace.incubator") + + val minIncubationTime = new NanoInterval(config.getDuration("min-incubation-time", TimeUnit.NANOSECONDS)) + val maxIncubationTime = new NanoInterval(config.getDuration("max-incubation-time", TimeUnit.NANOSECONDS)) + val checkInterval = config.getDuration("check-interval", TimeUnit.MILLISECONDS) + + val checkSchedule = context.system.scheduler.schedule(checkInterval.millis, checkInterval.millis, self, CheckForCompletedTraces) + var waitingForMinimumIncubation = Queue.empty[IncubatingTrace] + var waitingForIncubationFinish = List.empty[IncubatingTrace] + + def receive = { + case tc: TracingContext ⇒ incubate(tc) + case CheckForCompletedTraces ⇒ + checkWaitingForMinimumIncubation() + checkWaitingForIncubationFinish() + } + + def incubate(tc: TracingContext): Unit = + waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now)) + + @tailrec private def checkWaitingForMinimumIncubation(): Unit = { + if (waitingForMinimumIncubation.nonEmpty) { + val it = waitingForMinimumIncubation.head + if (NanoInterval.since(it.incubationStart) >= minIncubationTime) { + waitingForMinimumIncubation = waitingForMinimumIncubation.tail + + if (it.tc.shouldIncubate) + waitingForIncubationFinish = it :: waitingForIncubationFinish + else + dispatchTraceInfo(it.tc) + + checkWaitingForMinimumIncubation() + } + } + } + + private def checkWaitingForIncubationFinish(): Unit = { + waitingForIncubationFinish = waitingForIncubationFinish.filter { + case IncubatingTrace(context, incubationStart) ⇒ + if (!context.shouldIncubate) { + dispatchTraceInfo(context) + false + } else { + if (NanoInterval.since(incubationStart) >= maxIncubationTime) { + log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token) + dispatchTraceInfo(context); + false + } else true + } + } + } + + def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo + + override def postStop(): Unit = { + super.postStop() + checkSchedule.cancel() + } +} + +object Incubator { + + def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions)) + + case object CheckForCompletedTraces + case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp) +} diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala new file mode 100644 index 00000000..f478d971 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -0,0 +1,105 @@ +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.actor.ActorSystem +import akka.event.LoggingAdapter +import kamon.{ RelativeNanoTimestamp, NanoInterval } +import kamon.metric.TraceMetrics.TraceMetricRecorder +import kamon.metric.{ MetricsExtension, TraceMetrics } + +import scala.annotation.tailrec + +private[trace] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, val origin: TraceContextOrigin, + val startRelativeTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val system: ActorSystem) + extends TraceContext { + + @volatile private var _name = traceName + @volatile private var _isOpen = izOpen + @volatile protected var _elapsedTime = NanoInterval.default + + private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() + private val _traceLocalStorage = new TraceLocalStorage + + def rename(newName: String): Unit = + if (isOpen) + _name = newName + else if (log.isWarningEnabled) + log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) + + def name: String = _name + def isEmpty: Boolean = false + def isOpen: Boolean = _isOpen + def addMetadata(key: String, value: String): Unit = {} + + def finish(): Unit = { + _isOpen = false + val traceElapsedTime = NanoInterval.since(startRelativeTimestamp) + _elapsedTime = traceElapsedTime + + val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) + metricRecorder.map { traceMetrics ⇒ + traceMetrics.elapsedTime.record(traceElapsedTime.nanos) + drainFinishedSegments(traceMetrics) + } + } + + def startSegment(segmentName: String, category: String, library: String): Segment = + new MetricsOnlySegment(segmentName, category, library) + + @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { + val segment = _finishedSegments.poll() + if (segment != null) { + metricRecorder.segmentRecorder(segment.identity).record(segment.duration.nanos) + drainFinishedSegments(metricRecorder) + } + } + + protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration)) + + if (isClosed) { + metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ + drainFinishedSegments(traceMetrics) + } + } + } + + // Should only be used by the TraceLocal utilities. + def traceLocalStorage: TraceLocalStorage = _traceLocalStorage + + // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default + // will be returned. + def elapsedTime: NanoInterval = _elapsedTime + + class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { + private val _startTimestamp = RelativeNanoTimestamp.now + @volatile private var _segmentName = segmentName + @volatile private var _elapsedTime = NanoInterval.default + @volatile private var _isOpen = true + + def name: String = _segmentName + def isEmpty: Boolean = false + def addMetadata(key: String, value: String): Unit = {} + def isOpen: Boolean = _isOpen + + def rename(newName: String): Unit = + if (isOpen) + _segmentName = newName + else if (log.isWarningEnabled) + log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) + + def finish: Unit = { + _isOpen = false + val segmentElapsedTime = NanoInterval.since(_startTimestamp) + _elapsedTime = segmentElapsedTime + + finishSegment(name, category, library, segmentElapsedTime) + } + + // Handle with care and make sure that the segment is closed before calling this method, otherwise + // NanoInterval.default will be returned. + def elapsedTime: NanoInterval = _elapsedTime + def startTimestamp: RelativeNanoTimestamp = _startTimestamp + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala new file mode 100644 index 00000000..650592a5 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -0,0 +1,46 @@ +package kamon.trace + +import java.util.concurrent.atomic.AtomicLong + +import kamon.NanoInterval +import scala.concurrent.forkjoin.ThreadLocalRandom + +trait Sampler { + def shouldTrace: Boolean + def shouldReport(traceElapsedTime: NanoInterval): Boolean +} + +object NoSampling extends Sampler { + def shouldTrace: Boolean = false + def shouldReport(traceElapsedTime: NanoInterval): Boolean = false +} + +object SampleAll extends Sampler { + def shouldTrace: Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class RandomSampler(chance: Int) extends Sampler { + require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0") + require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") + + def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class OrderedSampler(interval: Int) extends Sampler { + require(interval > 0, "kamon.trace.ordered-sampler.interval cannot be <= 0") + + private val counter = new AtomicLong(0L) + def shouldTrace: Boolean = counter.incrementAndGet() % interval == 0 + // TODO: find a more efficient way to do this, protect from long overflow. + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class ThresholdSampler(thresholdInNanoseconds: Long) extends Sampler { + require(thresholdInNanoseconds > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0") + + def shouldTrace: Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime.nanos >= thresholdInNanoseconds +} + diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 5b74e6b2..466ceafd 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -17,128 +17,71 @@ package kamon.trace import java.io.ObjectStreamException - import akka.actor.ActorSystem -import kamon.Kamon +import kamon._ import kamon.metric._ -import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.metric.TraceMetrics.TraceMetricRecorder - -import scala.annotation.tailrec -sealed trait TraceContext { +trait TraceContext { def name: String def token: String - def rename(name: String): Unit - def finish(): Unit def origin: TraceContextOrigin - def isOpen: Boolean - def isClosed: Boolean = !isOpen def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty + def isOpen: Boolean + def isClosed: Boolean = !isOpen + def system: ActorSystem + + def finish(): Unit + def rename(newName: String): Unit def startSegment(segmentName: String, category: String, library: String): Segment - def nanoTimestamp: Long + def addMetadata(key: String, value: String) + def startRelativeTimestamp: RelativeNanoTimestamp } -sealed trait Segment { +trait Segment { def name: String - def rename(newName: String): Unit def category: String def library: String - def finish(): Unit def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty + def isOpen: Boolean + def isClosed: Boolean = !isOpen + + def finish(): Unit + def rename(newName: String): Unit + def addMetadata(key: String, value: String) } case object EmptyTraceContext extends TraceContext { def name: String = "empty-trace" def token: String = "" - def rename(name: String): Unit = {} - def finish(): Unit = {} def origin: TraceContextOrigin = TraceContextOrigin.Local - def isOpen: Boolean = false def isEmpty: Boolean = true + def isOpen: Boolean = false + def system: ActorSystem = sys.error("Can't obtain a ActorSystem from a EmptyTraceContext.") + + def finish(): Unit = {} + def rename(name: String): Unit = {} def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment - def nanoTimestamp: Long = 0L + def addMetadata(key: String, value: String): Unit = {} + def startRelativeTimestamp = new RelativeNanoTimestamp(0L) case object EmptySegment extends Segment { val name: String = "empty-segment" val category: String = "empty-category" val library: String = "empty-library" def isEmpty: Boolean = true - def rename(newName: String): Unit = {} - def finish: Unit = {} - } -} - -class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext { - - val isEmpty: Boolean = false - @volatile private var _name = traceName - @volatile private var _isOpen = izOpen - - private val _nanoTimestamp = nanoTimeztamp - private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() - private val metricsExtension = Kamon(Metrics)(system) - private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage - - def name: String = _name - def rename(newName: String): Unit = - if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace. - - def isOpen: Boolean = _isOpen - def nanoTimestamp: Long = _nanoTimestamp - - def finish(): Unit = { - _isOpen = false - val elapsedNanoTime = System.nanoTime() - _nanoTimestamp - val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) - - metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(elapsedNanoTime) - drainFinishedSegments(traceMetrics) - } - } - - def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library) + def isOpen: Boolean = false - @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { - val segment = finishedSegments.poll() - if (segment != null) { - metricRecorder.segmentRecorder(segment.identity).record(segment.duration) - drainFinishedSegments(metricRecorder) - } - } - - private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { - finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration)) - - if (isClosed) { - metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ - drainFinishedSegments(traceMetrics) - } - } - } - - class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment { - private val _segmentStartNanoTime = System.nanoTime() - @volatile private var _segmentName = segmentName - @volatile private var _isOpen = true - - def name: String = _segmentName - def rename(newName: String): Unit = _segmentName = newName - def isEmpty: Boolean = false - - def finish: Unit = { - val segmentFinishNanoTime = System.nanoTime() - finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime)) - } + def finish: Unit = {} + def rename(newName: String): Unit = {} + def addMetadata(key: String, value: String): Unit = {} } } case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity -case class SegmentData(identity: SegmentMetricIdentity, duration: Long) +case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: NanoInterval) object SegmentCategory { val HttpClient = "http-client" @@ -146,7 +89,7 @@ object SegmentCategory { sealed trait LevelOfDetail object LevelOfDetail { - case object OnlyMetrics extends LevelOfDetail + case object MetricsOnly extends LevelOfDetail case object SimpleTrace extends LevelOfDetail case object FullTrace extends LevelOfDetail } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala index a59abc18..8bd9384a 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala @@ -16,14 +16,63 @@ package kamon.trace -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import akka.actor._ import akka.actor +import akka.event.Logging +import kamon._ +import kamon.metric.Metrics import kamon.util.GlobPathFilter -import kamon.Kamon class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.trace") val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing") + + val detailLevel: LevelOfDetail = config.getString("level") match { + case "metrics-only" ⇒ LevelOfDetail.MetricsOnly + case "simple-trace" ⇒ LevelOfDetail.SimpleTrace + case other ⇒ sys.error(s"Unknown tracing level $other present in the configuration file.") + } + + val sampler: Sampler = + if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling + else config.getString("sampling") match { + case "all" ⇒ SampleAll + case "random" ⇒ new RandomSampler(config.getInt("random-sampler.chance")) + case "ordered" ⇒ new OrderedSampler(config.getInt("ordered-sampler.interval")) + case "threshold" ⇒ new RandomSampler(config.getInt("threshold-sampler.threshold")) + } + + val log = Logging(system, "TraceExtension") + val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") + val incubator = system.actorOf(Incubator.props(subscriptions)) + val metricsExtension = Kamon(Metrics)(system) + + def newTraceContext(traceName: String, token: String, origin: TraceContextOrigin, system: ActorSystem): TraceContext = + newTraceContext(traceName, token, true, origin, RelativeNanoTimestamp.now, system) + + def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, + startTimestamp: RelativeNanoTimestamp, system: ActorSystem): TraceContext = { + def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, detailLevel, origin, startTimestamp, log, metricsExtension, system) + + if (detailLevel == LevelOfDetail.MetricsOnly || origin == TraceContextOrigin.Remote) + newMetricsOnlyContext + else { + if (!sampler.shouldTrace) + newMetricsOnlyContext + else + new TracingContext(traceName, token, true, detailLevel, origin, startTimestamp, log, metricsExtension, this, system) + } + } + + def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber) + def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber) + + private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = + if (sampler.shouldReport(trace.elapsedTime)) + if (trace.shouldIncubate) + incubator ! trace + else + subscriptions ! trace.generateTraceInfo } object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { @@ -34,3 +83,6 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } } + +case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String])
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala index c5fb100c..84e234f3 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala @@ -43,13 +43,13 @@ object TraceLocal { object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext } def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value) - case EmptyTraceContext ⇒ // Can't store in the empty context. + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) + case EmptyTraceContext ⇒ // Can't store in the empty context. } def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key) - case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) + case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. } def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 8da187cb..703896c3 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -16,6 +16,8 @@ package kamon.trace +import kamon.{ MilliTimestamp, RelativeNanoTimestamp, Kamon } + import scala.language.experimental.macros import java.util.concurrent.atomic.AtomicLong import kamon.macros.InlineTraceContextMacro @@ -34,27 +36,12 @@ object TraceRecorder { def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) - private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = { - new DefaultTraceContext( - name, - token.getOrElse(newToken), - izOpen = true, - LevelOfDetail.OnlyMetrics, - TraceContextOrigin.Local, - nanoTimeztamp = System.nanoTime, - system) - } + private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = + Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), TraceContextOrigin.Local, system) - def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) - new DefaultTraceContext( - traceName, - traceToken, - isOpen, - LevelOfDetail.OnlyMetrics, - TraceContextOrigin.Remote, - equivalentNanotime, - system) + def joinRemoteTraceContext(traceName: String, traceToken: String, startTimestamp: MilliTimestamp, isOpen: Boolean, system: ActorSystem): TraceContext = { + val equivalentStartTimestamp = RelativeNanoTimestamp.relativeTo(startTimestamp) + Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentStartTimestamp, system) } def setContext(context: TraceContext): Unit = traceContextStorage.set(context) @@ -81,8 +68,8 @@ object TraceRecorder { } def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match { - case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system)) - case EmptyTraceContext ⇒ None + case ctx: MetricsOnlyContext ⇒ Some(thunk(ctx, ctx.system)) + case EmptyTraceContext ⇒ None } def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala new file mode 100644 index 00000000..d533a344 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala @@ -0,0 +1,29 @@ +package kamon.trace + +import akka.actor.{ Terminated, ActorRef, Actor } + +class TraceSubscriptions extends Actor { + import TraceSubscriptions._ + + var subscribers: List[ActorRef] = Nil + + def receive = { + case Subscribe(newSubscriber) ⇒ + if (!subscribers.contains(newSubscriber)) + subscribers = context.watch(newSubscriber) :: subscribers + + case Unsubscribe(leavingSubscriber) ⇒ + subscribers = subscribers.filter(_ == leavingSubscriber) + + case Terminated(terminatedSubscriber) ⇒ + subscribers = subscribers.filter(_ == terminatedSubscriber) + + case trace: TraceInfo ⇒ + subscribers.foreach(_ ! trace) + } +} + +object TraceSubscriptions { + case class Subscribe(subscriber: ActorRef) + case class Unsubscribe(subscriber: ActorRef) +} diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala new file mode 100644 index 00000000..6a8cb1c6 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -0,0 +1,76 @@ +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.ActorSystem +import akka.event.LoggingAdapter +import kamon.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp } +import kamon.metric.MetricsExtension + +import scala.collection.concurrent.TrieMap + +private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin, + startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, traceExtension: TraceExtension, system: ActorSystem) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, startTimeztamp, log, metricsExtension, system) { + + private val _openSegments = new AtomicInteger(0) + private val _startTimestamp = NanoTimestamp.now + private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]() + private val _metadata = TrieMap.empty[String, String] + + override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) + + override def startSegment(segmentName: String, category: String, library: String): Segment = { + _openSegments.incrementAndGet() + val newSegment = new TracingSegment(segmentName, category, library) + _allSegments.add(newSegment) + newSegment + } + + override def finish(): Unit = { + super.finish() + traceExtension.dispatchTracingContext(this) + } + + override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + _openSegments.decrementAndGet() + super.finishSegment(segmentName, category, library, duration) + } + + def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0 + + // Handle with care, should only be used after a trace is finished. + def generateTraceInfo: TraceInfo = { + require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.") + + val currentSegments = _allSegments.iterator() + var segmentsInfo = List.newBuilder[SegmentInfo] + + while (currentSegments.hasNext()) { + val segment = currentSegments.next() + if (segment.isClosed) + segmentsInfo += segment.createSegmentInfo(_startTimestamp, startRelativeTimestamp) + else + log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) + } + + TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result()) + } + + class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { + private val metadata = TrieMap.empty[String, String] + override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + + // Handle with care, should only be used after the segment has finished. + def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = { + require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.") + + // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both + // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. + val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) + + SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap) + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala index d79a3ab6..4f4efa4d 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala @@ -17,7 +17,7 @@ package kamon.trace.logging import kamon.trace.TraceLocal.AvailableToMdc -import kamon.trace.{ EmptyTraceContext, DefaultTraceContext, TraceContext, TraceRecorder } +import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext, TraceRecorder } import org.slf4j.MDC @@ -29,7 +29,7 @@ trait MdcKeysSupport { } private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match { - case ctx: DefaultTraceContext ⇒ + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.underlyingStorage.collect { case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey -> String.valueOf(value)) }.map { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } }.flatten |