From 216687a130d9f68aacc67d7fe932fb4007288291 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 3 Dec 2014 02:10:46 +0100 Subject: = core: first simple approach to providing traces and a subscription mechanism. --- kamon-core/src/main/resources/reference.conf | 29 ++++++ .../src/main/scala/kamon/trace/Incubator.scala | 45 +++++++++ .../scala/kamon/trace/MetricsOnlyContext.scala | 99 ++++++++++++++++++++ .../src/main/scala/kamon/trace/Sampler.scala | 45 +++++++++ .../src/main/scala/kamon/trace/TraceContext.scala | 87 ++++-------------- .../main/scala/kamon/trace/TraceExtension.scala | 51 ++++++++++- .../src/main/scala/kamon/trace/TraceLocal.scala | 8 +- .../src/main/scala/kamon/trace/TraceRecorder.scala | 25 ++--- .../scala/kamon/trace/TraceSubscriptions.scala | 29 ++++++ .../main/scala/kamon/trace/TracingContext.scala | 72 +++++++++++++++ .../scala/kamon/trace/logging/MdcKeysSupport.scala | 4 +- .../test/scala/kamon/trace/SimpleTraceSpec.scala | 101 +++++++++++++++++++++ 12 files changed, 503 insertions(+), 92 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/trace/Incubator.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/Sampler.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TracingContext.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 639d4aba..78d0900e 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -127,6 +127,35 @@ kamon { trace { + # Level of detail used when recording trace information. The posible values are: + # - metrics-only: metrics for all included traces and all segments are recorded, but no Trace messages will be sent + # to the subscriptors of trace data. + # - simple-trace: metrics for all included traces and all segments are recorded and additionally a Trace message + # containing the trace and segments details and metadata. + level = metrics-only + + # Sampling strategy to apply when the tracing level is set to `simple-trace`. The options are: all, random, ordered + # and threshold. The details of each sampler are bellow. + sampling = random + + # Use a ThreadLocalRandom to generate numbers between 1 and 100, if the random number is less or equal to .chance + # then tracing information will be gathered and reported for the current trace. + random-sampler { + chance = 10 + } + + # Use a AtomicLong to ensure that every .sample-interval number of requests tracing information will be gathered and + # reported. + ordered-sampler { + sample-interval = 10 + } + + # Gather tracing information for all traces but only report those whose elapsed-time is equal or greated to the + # .minimum-elapsed-time setting. + threshold-sampler { + threshold = 1 second + } + # If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask` # pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment # the future was created. diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala new file mode 100644 index 00000000..d363d771 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -0,0 +1,45 @@ +package kamon.trace + +import akka.actor.{ Props, Actor, ActorRef } +import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import scala.concurrent.duration._ + +class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor { + import context.dispatcher + val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces) + var incubating = Queue.empty[IncubatingTrace] + + def receive = { + case CheckForCompletedTraces ⇒ dispatchCompleted() + case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc)) + } + + @tailrec private def dispatchCompleted(): Unit = { + if (incubating.nonEmpty) { + val it = incubating.head + if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) { + it.tc.generateTraceInfo.map(subscriptions ! _) + incubating = incubating.tail + dispatchCompleted() + } + } + } + + override def postStop(): Unit = { + super.postStop() + checkSchedule.cancel() + } +} + +object Incubator { + + def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime)) + + case object CheckForCompletedTraces + case class IncubatingTrace(tc: TracingContext) { + private val incubationStartNanoTime = System.nanoTime() + def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala new file mode 100644 index 00000000..04e61407 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -0,0 +1,99 @@ +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.actor.ActorSystem +import akka.event.LoggingAdapter +import kamon.Kamon +import kamon.metric.TraceMetrics.TraceMetricRecorder +import kamon.metric.{ MetricsExtension, TraceMetrics, Metrics } + +import scala.annotation.tailrec + +class MetricsOnlyContext( + traceName: String, + val token: String, + izOpen: Boolean, + val levelOfDetail: LevelOfDetail, + val origin: TraceContextOrigin, + nanoTimeztamp: Long, + log: LoggingAdapter, + metricsExtension: MetricsExtension, + val system: ActorSystem) + extends TraceContext { + + @volatile private var _name = traceName + @volatile private var _isOpen = izOpen + @volatile protected var _elapsedNanoTime = 0L + + private val _nanoTimestamp = nanoTimeztamp + private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() + private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage + + def rename(newName: String): Unit = + if (isOpen) _name = newName + else if (log.isWarningEnabled) log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) + + def name: String = _name + def isEmpty: Boolean = false + def isOpen: Boolean = _isOpen + def nanoTimestamp: Long = _nanoTimestamp + def elapsedNanoTime: Long = _elapsedNanoTime + def addMetadata(key: String, value: String): Unit = {} + + def finish(): Unit = { + _isOpen = false + _elapsedNanoTime = System.nanoTime() - _nanoTimestamp + val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) + + metricRecorder.map { traceMetrics ⇒ + traceMetrics.elapsedTime.record(elapsedNanoTime) + drainFinishedSegments(traceMetrics) + } + } + + def startSegment(segmentName: String, category: String, library: String): Segment = + new MetricsOnlySegment(segmentName, category, library) + + @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { + val segment = _finishedSegments.poll() + if (segment != null) { + metricRecorder.segmentRecorder(segment.identity).record(segment.duration) + drainFinishedSegments(metricRecorder) + } + } + + protected def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { + _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration)) + + if (isClosed) { + metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ + drainFinishedSegments(traceMetrics) + } + } + } + + class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { + protected val segmentStartNanoTime = System.nanoTime() + @volatile private var _segmentName = segmentName + @volatile private var _elapsedNanoTime = 0L + @volatile protected var _isOpen = true + + def name: String = _segmentName + def isEmpty: Boolean = false + def addMetadata(key: String, value: String): Unit = {} + def isOpen: Boolean = _isOpen + def elapsedNanoTime: Long = _elapsedNanoTime + + def rename(newName: String): Unit = + if (isOpen) _segmentName = newName + else if (log.isWarningEnabled) log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) + + def finish: Unit = { + _isOpen = false + _elapsedNanoTime = System.nanoTime() - segmentStartNanoTime + + finishSegment(name, category, library, elapsedNanoTime) + } + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala new file mode 100644 index 00000000..60a400f8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -0,0 +1,45 @@ +package kamon.trace + +import java.util.concurrent.atomic.AtomicLong + +import scala.concurrent.forkjoin.ThreadLocalRandom + +trait Sampler { + def shouldTrace: Boolean + def shouldReport(traceElapsedNanoTime: Long): Boolean +} + +object NoSampling extends Sampler { + def shouldTrace: Boolean = false + def shouldReport(traceElapsedNanoTime: Long): Boolean = false +} + +object SampleAll extends Sampler { + def shouldTrace: Boolean = true + def shouldReport(traceElapsedNanoTime: Long): Boolean = true +} + +class RandomSampler(chance: Int) extends Sampler { + require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0") + require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") + + def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance + def shouldReport(traceElapsedNanoTime: Long): Boolean = true +} + +class OrderedSampler(interval: Int) extends Sampler { + require(interval > 0, "kamon.trace.ordered-sampler.interval cannot be <= 0") + + private val counter = new AtomicLong(0L) + def shouldTrace: Boolean = counter.incrementAndGet() % interval == 0 + // TODO: find a more efficient way to do this, protect from long overflow. + def shouldReport(traceElapsedNanoTime: Long): Boolean = true +} + +class ThresholdSampler(thresholdInNanoseconds: Long) extends Sampler { + require(thresholdInNanoseconds > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0") + + def shouldTrace: Boolean = true + def shouldReport(traceElapsedNanoTime: Long): Boolean = traceElapsedNanoTime >= thresholdInNanoseconds +} + diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 5b74e6b2..2c0b38bf 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -27,7 +27,7 @@ import kamon.metric.TraceMetrics.TraceMetricRecorder import scala.annotation.tailrec -sealed trait TraceContext { +trait TraceContext { def name: String def token: String def rename(name: String): Unit @@ -39,15 +39,23 @@ sealed trait TraceContext { def nonEmpty: Boolean = !isEmpty def startSegment(segmentName: String, category: String, library: String): Segment def nanoTimestamp: Long + def addMetadata(key: String, value: String) + + protected def elapsedNanoTime: Long } -sealed trait Segment { +trait Segment { def name: String def rename(newName: String): Unit def category: String def library: String def finish(): Unit + def isOpen: Boolean + def isClosed: Boolean = !isOpen def isEmpty: Boolean + def addMetadata(key: String, value: String) + + protected def elapsedNanoTime: Long } case object EmptyTraceContext extends TraceContext { @@ -60,85 +68,24 @@ case object EmptyTraceContext extends TraceContext { def isEmpty: Boolean = true def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment def nanoTimestamp: Long = 0L + def addMetadata(key: String, value: String): Unit = {} + def elapsedNanoTime: Long = 0L case object EmptySegment extends Segment { val name: String = "empty-segment" val category: String = "empty-category" val library: String = "empty-library" def isEmpty: Boolean = true + def isOpen: Boolean = false def rename(newName: String): Unit = {} def finish: Unit = {} - } -} - -class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext { - - val isEmpty: Boolean = false - @volatile private var _name = traceName - @volatile private var _isOpen = izOpen - - private val _nanoTimestamp = nanoTimeztamp - private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() - private val metricsExtension = Kamon(Metrics)(system) - private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage - - def name: String = _name - def rename(newName: String): Unit = - if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace. - - def isOpen: Boolean = _isOpen - def nanoTimestamp: Long = _nanoTimestamp - - def finish(): Unit = { - _isOpen = false - val elapsedNanoTime = System.nanoTime() - _nanoTimestamp - val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) - - metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(elapsedNanoTime) - drainFinishedSegments(traceMetrics) - } - } - - def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library) - - @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { - val segment = finishedSegments.poll() - if (segment != null) { - metricRecorder.segmentRecorder(segment.identity).record(segment.duration) - drainFinishedSegments(metricRecorder) - } - } - - private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { - finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration)) - - if (isClosed) { - metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ - drainFinishedSegments(traceMetrics) - } - } - } - - class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment { - private val _segmentStartNanoTime = System.nanoTime() - @volatile private var _segmentName = segmentName - @volatile private var _isOpen = true - - def name: String = _segmentName - def rename(newName: String): Unit = _segmentName = newName - def isEmpty: Boolean = false - - def finish: Unit = { - val segmentFinishNanoTime = System.nanoTime() - finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime)) - } + def addMetadata(key: String, value: String): Unit = {} + def elapsedNanoTime: Long = 0L } } case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity -case class SegmentData(identity: SegmentMetricIdentity, duration: Long) +case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: Long) object SegmentCategory { val HttpClient = "http-client" @@ -146,7 +93,7 @@ object SegmentCategory { sealed trait LevelOfDetail object LevelOfDetail { - case object OnlyMetrics extends LevelOfDetail + case object MetricsOnly extends LevelOfDetail case object SimpleTrace extends LevelOfDetail case object FullTrace extends LevelOfDetail } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala index a59abc18..a80a4321 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala @@ -16,14 +16,60 @@ package kamon.trace -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import akka.actor._ import akka.actor +import akka.event.Logging +import kamon.metric.Metrics import kamon.util.GlobPathFilter import kamon.Kamon class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.trace") val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing") + + val detailLevel: LevelOfDetail = config.getString("level") match { + case "metrics-only" ⇒ LevelOfDetail.MetricsOnly + case "simple-trace" ⇒ LevelOfDetail.SimpleTrace + case other ⇒ sys.error(s"Unknown tracing level $other present in the configuration file.") + } + + val sampler: Sampler = + if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling + else config.getString("sampling") match { + case "all" ⇒ SampleAll + case "random" ⇒ new RandomSampler(config.getInt("random-sampler.chance")) + case "ordered" ⇒ new OrderedSampler(config.getInt("ordered-sampler.interval")) + case "threshold" ⇒ new RandomSampler(config.getInt("threshold-sampler.threshold")) + } + + val log = Logging(system, "TraceExtension") + val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") + val incubator = system.actorOf(Incubator.props(subscriptions, 10000000000L)) + val metricsExtension = Kamon(Metrics)(system) + + def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, nanoTimestamp: Long, system: ActorSystem): TraceContext = { + def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, metricsExtension, system) + + if (detailLevel == LevelOfDetail.MetricsOnly) + newMetricsOnlyContext + else { + if (!sampler.shouldTrace) + newMetricsOnlyContext + else + new TracingContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, this, metricsExtension, system) + } + } + + def report(trace: TracingContext): Unit = if (sampler.shouldReport(trace.elapsedNanoTime)) { + if (trace.shouldIncubate) + incubator ! trace + else + trace.generateTraceInfo.map(subscriptions ! _) + } + + def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber) + def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber) + } object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { @@ -34,3 +80,6 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } } + +case class TraceInfo(name: String, token: String, startMilliTime: Long, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String]) \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala index c5fb100c..84e234f3 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala @@ -43,13 +43,13 @@ object TraceLocal { object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext } def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value) - case EmptyTraceContext ⇒ // Can't store in the empty context. + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) + case EmptyTraceContext ⇒ // Can't store in the empty context. } def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key) - case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) + case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. } def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 8da187cb..572d94e5 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -16,6 +16,8 @@ package kamon.trace +import kamon.Kamon + import scala.language.experimental.macros import java.util.concurrent.atomic.AtomicLong import kamon.macros.InlineTraceContextMacro @@ -34,27 +36,20 @@ object TraceRecorder { def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) - private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = { - new DefaultTraceContext( - name, - token.getOrElse(newToken), - izOpen = true, - LevelOfDetail.OnlyMetrics, - TraceContextOrigin.Local, - nanoTimeztamp = System.nanoTime, - system) - } + private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = + Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), true, TraceContextOrigin.Local, System.nanoTime(), system) def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) - new DefaultTraceContext( + /*new MetricsOnlyContext( traceName, traceToken, isOpen, - LevelOfDetail.OnlyMetrics, + LevelOfDetail.MetricsOnly, TraceContextOrigin.Remote, equivalentNanotime, - system) + system)*/ + ??? } def setContext(context: TraceContext): Unit = traceContextStorage.set(context) @@ -81,8 +76,8 @@ object TraceRecorder { } def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match { - case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system)) - case EmptyTraceContext ⇒ None + case ctx: MetricsOnlyContext ⇒ Some(thunk(ctx, ctx.system)) + case EmptyTraceContext ⇒ None } def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala new file mode 100644 index 00000000..d533a344 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala @@ -0,0 +1,29 @@ +package kamon.trace + +import akka.actor.{ Terminated, ActorRef, Actor } + +class TraceSubscriptions extends Actor { + import TraceSubscriptions._ + + var subscribers: List[ActorRef] = Nil + + def receive = { + case Subscribe(newSubscriber) ⇒ + if (!subscribers.contains(newSubscriber)) + subscribers = context.watch(newSubscriber) :: subscribers + + case Unsubscribe(leavingSubscriber) ⇒ + subscribers = subscribers.filter(_ == leavingSubscriber) + + case Terminated(terminatedSubscriber) ⇒ + subscribers = subscribers.filter(_ == terminatedSubscriber) + + case trace: TraceInfo ⇒ + subscribers.foreach(_ ! trace) + } +} + +object TraceSubscriptions { + case class Subscribe(subscriber: ActorRef) + case class Unsubscribe(subscriber: ActorRef) +} diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala new file mode 100644 index 00000000..c9cbc754 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -0,0 +1,72 @@ +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.{ AtomicInteger, AtomicLongFieldUpdater } + +import akka.actor.ActorSystem +import akka.event.LoggingAdapter +import kamon.Kamon +import kamon.metric.TraceMetrics.TraceMetricRecorder +import kamon.metric.{ MetricsExtension, Metrics, TraceMetrics } + +import scala.annotation.tailrec +import scala.collection.concurrent.TrieMap + +class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin, + nanoTimeztamp: Long, log: LoggingAdapter, traceExtension: TraceExtension, metricsExtension: MetricsExtension, system: ActorSystem) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, nanoTimeztamp, log, metricsExtension, system) { + + val openSegments = new AtomicInteger(0) + private val startMilliTime = System.currentTimeMillis() + private val allSegments = new ConcurrentLinkedQueue[TracingSegment]() + private val metadata = TrieMap.empty[String, String] + + override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + + override def startSegment(segmentName: String, category: String, library: String): Segment = { + openSegments.incrementAndGet() + val newSegment = new TracingSegment(segmentName, category, library) + allSegments.add(newSegment) + newSegment + } + + override def finish(): Unit = { + super.finish() + traceExtension.report(this) + } + + override def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { + openSegments.decrementAndGet() + super.finishSegment(segmentName, category, library, duration) + } + + def shouldIncubate: Boolean = isOpen || openSegments.get() > 0 + + def generateTraceInfo: Option[TraceInfo] = if (isOpen) None else { + val currentSegments = allSegments.iterator() + var segmentsInfo: List[SegmentInfo] = Nil + + while (currentSegments.hasNext()) { + val segment = currentSegments.next() + segment.createSegmentInfo match { + case Some(si) ⇒ segmentsInfo = si :: segmentsInfo + case None ⇒ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) + } + } + + Some(TraceInfo(traceName, token, startMilliTime, nanoTimeztamp, elapsedNanoTime, metadata.toMap, segmentsInfo)) + } + + class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { + private val metadata = TrieMap.empty[String, String] + override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + + override def finish: Unit = { + super.finish() + } + + def createSegmentInfo: Option[SegmentInfo] = + if (isOpen) None + else Some(SegmentInfo(this.name, category, library, segmentStartNanoTime, elapsedNanoTime, metadata.toMap)) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala index d79a3ab6..4f4efa4d 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala @@ -17,7 +17,7 @@ package kamon.trace.logging import kamon.trace.TraceLocal.AvailableToMdc -import kamon.trace.{ EmptyTraceContext, DefaultTraceContext, TraceContext, TraceRecorder } +import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext, TraceRecorder } import org.slf4j.MDC @@ -29,7 +29,7 @@ trait MdcKeysSupport { } private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match { - case ctx: DefaultTraceContext ⇒ + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.underlyingStorage.collect { case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey -> String.valueOf(value)) }.map { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } }.flatten diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala new file mode 100644 index 00000000..a263ff7f --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala @@ -0,0 +1,101 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import akka.actor.ActorSystem +import akka.testkit.{ ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import org.scalatest.{ Matchers, WordSpecLike } +import scala.concurrent.duration._ + +class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("simple-trace-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 hour + | filters = [ + | { + | trace { + | includes = [ "*" ] + | excludes = [ "non-tracked-trace"] + | } + | } + | ] + | precision { + | default-histogram-precision { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | + | default-min-max-counter-precision { + | refresh-interval = 1 second + | highest-trackable-value = 999999999 + | significant-value-digits = 2 + | } + | } + |} + | + |kamon.trace { + | level = simple-trace + | sampling = all + |} + """.stripMargin)) + + "the simple tracing" should { + "send a TraceInfo when the trace has finished and all segments are finished" in { + Kamon(Trace)(system).subscribe(testActor) + + TraceRecorder.withNewTraceContext("simple-trace-without-segments") { + TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish() + TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test").finish() + TraceRecorder.finish() + } + + val traceInfo = expectMsgType[TraceInfo] + Kamon(Trace)(system).unsubscribe(testActor) + + traceInfo.name should be("simple-trace-without-segments") + traceInfo.segments.size should be(2) + traceInfo.segments.find(_.name == "segment-one") should be('defined) + traceInfo.segments.find(_.name == "segment-two") should be('defined) + } + + "incubate the tracing context if there are open segments after finishing" in { + Kamon(Trace)(system).subscribe(testActor) + + val secondSegment = TraceRecorder.withNewTraceContext("simple-trace-without-segments") { + TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish() + val segment = TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test") + TraceRecorder.finish() + segment + } + + expectNoMsg(2 seconds) + secondSegment.finish() + + val traceInfo = expectMsgType[TraceInfo] + Kamon(Trace)(system).unsubscribe(testActor) + + traceInfo.name should be("simple-trace-without-segments") + traceInfo.segments.size should be(2) + traceInfo.segments.find(_.name == "segment-one") should be('defined) + traceInfo.segments.find(_.name == "segment-two") should be('defined) + } + + } +} -- cgit v1.2.3