From 594c7a1729789eae7037918cde7287bdc4111b70 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 3 Dec 2014 02:10:46 +0100 Subject: = core: first simple approach to providing traces and a subscription mechanism. --- .../src/main/scala/kamon/trace/Incubator.scala | 45 ++++++++++ .../scala/kamon/trace/MetricsOnlyContext.scala | 99 ++++++++++++++++++++++ .../src/main/scala/kamon/trace/Sampler.scala | 45 ++++++++++ .../src/main/scala/kamon/trace/TraceContext.scala | 87 ++++--------------- .../main/scala/kamon/trace/TraceExtension.scala | 51 ++++++++++- .../src/main/scala/kamon/trace/TraceLocal.scala | 8 +- .../src/main/scala/kamon/trace/TraceRecorder.scala | 25 +++--- .../scala/kamon/trace/TraceSubscriptions.scala | 29 +++++++ .../main/scala/kamon/trace/TracingContext.scala | 72 ++++++++++++++++ .../scala/kamon/trace/logging/MdcKeysSupport.scala | 4 +- 10 files changed, 373 insertions(+), 92 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/trace/Incubator.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/Sampler.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TracingContext.scala (limited to 'kamon-core/src/main/scala/kamon') diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala new file mode 100644 index 00000000..d363d771 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -0,0 +1,45 @@ +package kamon.trace + +import akka.actor.{ Props, Actor, ActorRef } +import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import scala.concurrent.duration._ + +class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor { + import context.dispatcher + val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces) + var incubating = Queue.empty[IncubatingTrace] + + def receive = { + case CheckForCompletedTraces ⇒ dispatchCompleted() + case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc)) + } + + @tailrec private def dispatchCompleted(): Unit = { + if (incubating.nonEmpty) { + val it = incubating.head + if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) { + it.tc.generateTraceInfo.map(subscriptions ! _) + incubating = incubating.tail + dispatchCompleted() + } + } + } + + override def postStop(): Unit = { + super.postStop() + checkSchedule.cancel() + } +} + +object Incubator { + + def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime)) + + case object CheckForCompletedTraces + case class IncubatingTrace(tc: TracingContext) { + private val incubationStartNanoTime = System.nanoTime() + def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala new file mode 100644 index 00000000..04e61407 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -0,0 +1,99 @@ +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.actor.ActorSystem +import akka.event.LoggingAdapter +import kamon.Kamon +import kamon.metric.TraceMetrics.TraceMetricRecorder +import kamon.metric.{ MetricsExtension, TraceMetrics, Metrics } + +import scala.annotation.tailrec + +class MetricsOnlyContext( + traceName: String, + val token: String, + izOpen: Boolean, + val levelOfDetail: LevelOfDetail, + val origin: TraceContextOrigin, + nanoTimeztamp: Long, + log: LoggingAdapter, + metricsExtension: MetricsExtension, + val system: ActorSystem) + extends TraceContext { + + @volatile private var _name = traceName + @volatile private var _isOpen = izOpen + @volatile protected var _elapsedNanoTime = 0L + + private val _nanoTimestamp = nanoTimeztamp + private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() + private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage + + def rename(newName: String): Unit = + if (isOpen) _name = newName + else if (log.isWarningEnabled) log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) + + def name: String = _name + def isEmpty: Boolean = false + def isOpen: Boolean = _isOpen + def nanoTimestamp: Long = _nanoTimestamp + def elapsedNanoTime: Long = _elapsedNanoTime + def addMetadata(key: String, value: String): Unit = {} + + def finish(): Unit = { + _isOpen = false + _elapsedNanoTime = System.nanoTime() - _nanoTimestamp + val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) + + metricRecorder.map { traceMetrics ⇒ + traceMetrics.elapsedTime.record(elapsedNanoTime) + drainFinishedSegments(traceMetrics) + } + } + + def startSegment(segmentName: String, category: String, library: String): Segment = + new MetricsOnlySegment(segmentName, category, library) + + @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { + val segment = _finishedSegments.poll() + if (segment != null) { + metricRecorder.segmentRecorder(segment.identity).record(segment.duration) + drainFinishedSegments(metricRecorder) + } + } + + protected def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { + _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration)) + + if (isClosed) { + metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ + drainFinishedSegments(traceMetrics) + } + } + } + + class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { + protected val segmentStartNanoTime = System.nanoTime() + @volatile private var _segmentName = segmentName + @volatile private var _elapsedNanoTime = 0L + @volatile protected var _isOpen = true + + def name: String = _segmentName + def isEmpty: Boolean = false + def addMetadata(key: String, value: String): Unit = {} + def isOpen: Boolean = _isOpen + def elapsedNanoTime: Long = _elapsedNanoTime + + def rename(newName: String): Unit = + if (isOpen) _segmentName = newName + else if (log.isWarningEnabled) log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) + + def finish: Unit = { + _isOpen = false + _elapsedNanoTime = System.nanoTime() - segmentStartNanoTime + + finishSegment(name, category, library, elapsedNanoTime) + } + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala new file mode 100644 index 00000000..60a400f8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -0,0 +1,45 @@ +package kamon.trace + +import java.util.concurrent.atomic.AtomicLong + +import scala.concurrent.forkjoin.ThreadLocalRandom + +trait Sampler { + def shouldTrace: Boolean + def shouldReport(traceElapsedNanoTime: Long): Boolean +} + +object NoSampling extends Sampler { + def shouldTrace: Boolean = false + def shouldReport(traceElapsedNanoTime: Long): Boolean = false +} + +object SampleAll extends Sampler { + def shouldTrace: Boolean = true + def shouldReport(traceElapsedNanoTime: Long): Boolean = true +} + +class RandomSampler(chance: Int) extends Sampler { + require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0") + require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") + + def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance + def shouldReport(traceElapsedNanoTime: Long): Boolean = true +} + +class OrderedSampler(interval: Int) extends Sampler { + require(interval > 0, "kamon.trace.ordered-sampler.interval cannot be <= 0") + + private val counter = new AtomicLong(0L) + def shouldTrace: Boolean = counter.incrementAndGet() % interval == 0 + // TODO: find a more efficient way to do this, protect from long overflow. + def shouldReport(traceElapsedNanoTime: Long): Boolean = true +} + +class ThresholdSampler(thresholdInNanoseconds: Long) extends Sampler { + require(thresholdInNanoseconds > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0") + + def shouldTrace: Boolean = true + def shouldReport(traceElapsedNanoTime: Long): Boolean = traceElapsedNanoTime >= thresholdInNanoseconds +} + diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 5b74e6b2..2c0b38bf 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -27,7 +27,7 @@ import kamon.metric.TraceMetrics.TraceMetricRecorder import scala.annotation.tailrec -sealed trait TraceContext { +trait TraceContext { def name: String def token: String def rename(name: String): Unit @@ -39,15 +39,23 @@ sealed trait TraceContext { def nonEmpty: Boolean = !isEmpty def startSegment(segmentName: String, category: String, library: String): Segment def nanoTimestamp: Long + def addMetadata(key: String, value: String) + + protected def elapsedNanoTime: Long } -sealed trait Segment { +trait Segment { def name: String def rename(newName: String): Unit def category: String def library: String def finish(): Unit + def isOpen: Boolean + def isClosed: Boolean = !isOpen def isEmpty: Boolean + def addMetadata(key: String, value: String) + + protected def elapsedNanoTime: Long } case object EmptyTraceContext extends TraceContext { @@ -60,85 +68,24 @@ case object EmptyTraceContext extends TraceContext { def isEmpty: Boolean = true def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment def nanoTimestamp: Long = 0L + def addMetadata(key: String, value: String): Unit = {} + def elapsedNanoTime: Long = 0L case object EmptySegment extends Segment { val name: String = "empty-segment" val category: String = "empty-category" val library: String = "empty-library" def isEmpty: Boolean = true + def isOpen: Boolean = false def rename(newName: String): Unit = {} def finish: Unit = {} - } -} - -class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext { - - val isEmpty: Boolean = false - @volatile private var _name = traceName - @volatile private var _isOpen = izOpen - - private val _nanoTimestamp = nanoTimeztamp - private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() - private val metricsExtension = Kamon(Metrics)(system) - private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage - - def name: String = _name - def rename(newName: String): Unit = - if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace. - - def isOpen: Boolean = _isOpen - def nanoTimestamp: Long = _nanoTimestamp - - def finish(): Unit = { - _isOpen = false - val elapsedNanoTime = System.nanoTime() - _nanoTimestamp - val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) - - metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(elapsedNanoTime) - drainFinishedSegments(traceMetrics) - } - } - - def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library) - - @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { - val segment = finishedSegments.poll() - if (segment != null) { - metricRecorder.segmentRecorder(segment.identity).record(segment.duration) - drainFinishedSegments(metricRecorder) - } - } - - private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { - finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration)) - - if (isClosed) { - metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ - drainFinishedSegments(traceMetrics) - } - } - } - - class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment { - private val _segmentStartNanoTime = System.nanoTime() - @volatile private var _segmentName = segmentName - @volatile private var _isOpen = true - - def name: String = _segmentName - def rename(newName: String): Unit = _segmentName = newName - def isEmpty: Boolean = false - - def finish: Unit = { - val segmentFinishNanoTime = System.nanoTime() - finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime)) - } + def addMetadata(key: String, value: String): Unit = {} + def elapsedNanoTime: Long = 0L } } case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity -case class SegmentData(identity: SegmentMetricIdentity, duration: Long) +case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: Long) object SegmentCategory { val HttpClient = "http-client" @@ -146,7 +93,7 @@ object SegmentCategory { sealed trait LevelOfDetail object LevelOfDetail { - case object OnlyMetrics extends LevelOfDetail + case object MetricsOnly extends LevelOfDetail case object SimpleTrace extends LevelOfDetail case object FullTrace extends LevelOfDetail } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala index a59abc18..a80a4321 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala @@ -16,14 +16,60 @@ package kamon.trace -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import akka.actor._ import akka.actor +import akka.event.Logging +import kamon.metric.Metrics import kamon.util.GlobPathFilter import kamon.Kamon class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.trace") val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing") + + val detailLevel: LevelOfDetail = config.getString("level") match { + case "metrics-only" ⇒ LevelOfDetail.MetricsOnly + case "simple-trace" ⇒ LevelOfDetail.SimpleTrace + case other ⇒ sys.error(s"Unknown tracing level $other present in the configuration file.") + } + + val sampler: Sampler = + if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling + else config.getString("sampling") match { + case "all" ⇒ SampleAll + case "random" ⇒ new RandomSampler(config.getInt("random-sampler.chance")) + case "ordered" ⇒ new OrderedSampler(config.getInt("ordered-sampler.interval")) + case "threshold" ⇒ new RandomSampler(config.getInt("threshold-sampler.threshold")) + } + + val log = Logging(system, "TraceExtension") + val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") + val incubator = system.actorOf(Incubator.props(subscriptions, 10000000000L)) + val metricsExtension = Kamon(Metrics)(system) + + def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, nanoTimestamp: Long, system: ActorSystem): TraceContext = { + def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, metricsExtension, system) + + if (detailLevel == LevelOfDetail.MetricsOnly) + newMetricsOnlyContext + else { + if (!sampler.shouldTrace) + newMetricsOnlyContext + else + new TracingContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, this, metricsExtension, system) + } + } + + def report(trace: TracingContext): Unit = if (sampler.shouldReport(trace.elapsedNanoTime)) { + if (trace.shouldIncubate) + incubator ! trace + else + trace.generateTraceInfo.map(subscriptions ! _) + } + + def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber) + def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber) + } object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { @@ -34,3 +80,6 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } } + +case class TraceInfo(name: String, token: String, startMilliTime: Long, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String]) \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala index c5fb100c..84e234f3 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala @@ -43,13 +43,13 @@ object TraceLocal { object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext } def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value) - case EmptyTraceContext ⇒ // Can't store in the empty context. + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) + case EmptyTraceContext ⇒ // Can't store in the empty context. } def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key) - case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) + case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. } def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 8da187cb..572d94e5 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -16,6 +16,8 @@ package kamon.trace +import kamon.Kamon + import scala.language.experimental.macros import java.util.concurrent.atomic.AtomicLong import kamon.macros.InlineTraceContextMacro @@ -34,27 +36,20 @@ object TraceRecorder { def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) - private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = { - new DefaultTraceContext( - name, - token.getOrElse(newToken), - izOpen = true, - LevelOfDetail.OnlyMetrics, - TraceContextOrigin.Local, - nanoTimeztamp = System.nanoTime, - system) - } + private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = + Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), true, TraceContextOrigin.Local, System.nanoTime(), system) def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) - new DefaultTraceContext( + /*new MetricsOnlyContext( traceName, traceToken, isOpen, - LevelOfDetail.OnlyMetrics, + LevelOfDetail.MetricsOnly, TraceContextOrigin.Remote, equivalentNanotime, - system) + system)*/ + ??? } def setContext(context: TraceContext): Unit = traceContextStorage.set(context) @@ -81,8 +76,8 @@ object TraceRecorder { } def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match { - case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system)) - case EmptyTraceContext ⇒ None + case ctx: MetricsOnlyContext ⇒ Some(thunk(ctx, ctx.system)) + case EmptyTraceContext ⇒ None } def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala new file mode 100644 index 00000000..d533a344 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala @@ -0,0 +1,29 @@ +package kamon.trace + +import akka.actor.{ Terminated, ActorRef, Actor } + +class TraceSubscriptions extends Actor { + import TraceSubscriptions._ + + var subscribers: List[ActorRef] = Nil + + def receive = { + case Subscribe(newSubscriber) ⇒ + if (!subscribers.contains(newSubscriber)) + subscribers = context.watch(newSubscriber) :: subscribers + + case Unsubscribe(leavingSubscriber) ⇒ + subscribers = subscribers.filter(_ == leavingSubscriber) + + case Terminated(terminatedSubscriber) ⇒ + subscribers = subscribers.filter(_ == terminatedSubscriber) + + case trace: TraceInfo ⇒ + subscribers.foreach(_ ! trace) + } +} + +object TraceSubscriptions { + case class Subscribe(subscriber: ActorRef) + case class Unsubscribe(subscriber: ActorRef) +} diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala new file mode 100644 index 00000000..c9cbc754 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -0,0 +1,72 @@ +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.{ AtomicInteger, AtomicLongFieldUpdater } + +import akka.actor.ActorSystem +import akka.event.LoggingAdapter +import kamon.Kamon +import kamon.metric.TraceMetrics.TraceMetricRecorder +import kamon.metric.{ MetricsExtension, Metrics, TraceMetrics } + +import scala.annotation.tailrec +import scala.collection.concurrent.TrieMap + +class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin, + nanoTimeztamp: Long, log: LoggingAdapter, traceExtension: TraceExtension, metricsExtension: MetricsExtension, system: ActorSystem) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, nanoTimeztamp, log, metricsExtension, system) { + + val openSegments = new AtomicInteger(0) + private val startMilliTime = System.currentTimeMillis() + private val allSegments = new ConcurrentLinkedQueue[TracingSegment]() + private val metadata = TrieMap.empty[String, String] + + override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + + override def startSegment(segmentName: String, category: String, library: String): Segment = { + openSegments.incrementAndGet() + val newSegment = new TracingSegment(segmentName, category, library) + allSegments.add(newSegment) + newSegment + } + + override def finish(): Unit = { + super.finish() + traceExtension.report(this) + } + + override def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { + openSegments.decrementAndGet() + super.finishSegment(segmentName, category, library, duration) + } + + def shouldIncubate: Boolean = isOpen || openSegments.get() > 0 + + def generateTraceInfo: Option[TraceInfo] = if (isOpen) None else { + val currentSegments = allSegments.iterator() + var segmentsInfo: List[SegmentInfo] = Nil + + while (currentSegments.hasNext()) { + val segment = currentSegments.next() + segment.createSegmentInfo match { + case Some(si) ⇒ segmentsInfo = si :: segmentsInfo + case None ⇒ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) + } + } + + Some(TraceInfo(traceName, token, startMilliTime, nanoTimeztamp, elapsedNanoTime, metadata.toMap, segmentsInfo)) + } + + class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { + private val metadata = TrieMap.empty[String, String] + override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + + override def finish: Unit = { + super.finish() + } + + def createSegmentInfo: Option[SegmentInfo] = + if (isOpen) None + else Some(SegmentInfo(this.name, category, library, segmentStartNanoTime, elapsedNanoTime, metadata.toMap)) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala index d79a3ab6..4f4efa4d 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala @@ -17,7 +17,7 @@ package kamon.trace.logging import kamon.trace.TraceLocal.AvailableToMdc -import kamon.trace.{ EmptyTraceContext, DefaultTraceContext, TraceContext, TraceRecorder } +import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext, TraceRecorder } import org.slf4j.MDC @@ -29,7 +29,7 @@ trait MdcKeysSupport { } private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match { - case ctx: DefaultTraceContext ⇒ + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.underlyingStorage.collect { case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey -> String.valueOf(value)) }.map { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } }.flatten -- cgit v1.2.3 From 46d823ec5ab0265edacf7f704ad0e0c8a61609d1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 3 Dec 2014 17:31:53 +0100 Subject: = core: use MetricsOnly contexts for remote contexts --- kamon-core/src/main/scala/kamon/trace/TraceExtension.scala | 2 +- kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) (limited to 'kamon-core/src/main/scala/kamon') diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala index a80a4321..e5fbb15e 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala @@ -50,7 +50,7 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, nanoTimestamp: Long, system: ActorSystem): TraceContext = { def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, metricsExtension, system) - if (detailLevel == LevelOfDetail.MetricsOnly) + if (detailLevel == LevelOfDetail.MetricsOnly || origin == TraceContextOrigin.Remote) newMetricsOnlyContext else { if (!sampler.shouldTrace) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 572d94e5..af47bf3c 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -40,16 +40,8 @@ object TraceRecorder { Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), true, TraceContextOrigin.Local, System.nanoTime(), system) def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) - /*new MetricsOnlyContext( - traceName, - traceToken, - isOpen, - LevelOfDetail.MetricsOnly, - TraceContextOrigin.Remote, - equivalentNanotime, - system)*/ - ??? + val equivalentNanoTime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) + Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentNanoTime, system) } def setContext(context: TraceContext): Unit = traceContextStorage.set(context) -- cgit v1.2.3 From 432fb45952c587bcebf81d718188e7067572cf49 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 4 Dec 2014 03:20:41 +0100 Subject: + core: cleanup the simple trace implementation --- kamon-core/src/main/resources/reference.conf | 15 +++++ kamon-core/src/main/scala/kamon/TimeUnits.scala | 57 +++++++++++++++++ .../src/main/scala/kamon/trace/Incubator.scala | 71 ++++++++++++++++------ .../scala/kamon/trace/MetricsOnlyContext.scala | 70 +++++++++++---------- .../src/main/scala/kamon/trace/Sampler.scala | 13 ++-- .../src/main/scala/kamon/trace/TraceContext.scala | 48 +++++++-------- .../main/scala/kamon/trace/TraceExtension.scala | 31 +++++----- .../src/main/scala/kamon/trace/TraceRecorder.scala | 10 +-- .../main/scala/kamon/trace/TracingContext.scala | 70 +++++++++++---------- .../test/scala/kamon/trace/SimpleTraceSpec.scala | 14 +++-- 10 files changed, 259 insertions(+), 140 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/TimeUnits.scala (limited to 'kamon-core/src/main/scala/kamon') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 78d0900e..7c4b4ecb 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -156,6 +156,21 @@ kamon { threshold = 1 second } + incubator { + # Minimum time to stay in the trace incubator before checking if the trace should not be incubated anymore. No + # checks are made at least until this period has passed. + min-incubation-time = 5 seconds + + # Time to wait between incubation checks. After min-incubation-time, a trace is checked using this interval and if + # if shouldn't be incubated anymore, the TraceInfo is collected and reported for it. + check-interval = 1 second + + # Max amount of time that a trace can be in the incubator. If this time is reached for a given trace then it will + # be reported with whatever information is available at the moment, logging a warning for each segment that remains + # open after this point. + max-incubation-time = 20 seconds + } + # If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask` # pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment # the future was created. diff --git a/kamon-core/src/main/scala/kamon/TimeUnits.scala b/kamon-core/src/main/scala/kamon/TimeUnits.scala new file mode 100644 index 00000000..44f5b4c3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TimeUnits.scala @@ -0,0 +1,57 @@ +package kamon + +/** + * Epoch time stamp in milliseconds. + */ +class MilliTimestamp(val millis: Long) extends AnyVal { + override def toString: String = String.valueOf(millis) + ".millis" +} + +object MilliTimestamp { + def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis()) +} + +/** + * Epoch time stamp in nanoseconds. + * + * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch + * timestamp in nanoseconds. + */ +class NanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoTimestamp { + def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000) +} + +/** + * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime() + */ +class RelativeNanoTimestamp(val nanos: Long) extends AnyVal { + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object RelativeNanoTimestamp { + def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime()) + def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp = + new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000) +} + +/** + * Number of nanoseconds that passed between two points in time. + */ +class NanoInterval(val nanos: Long) extends AnyVal { + def <(that: NanoInterval): Boolean = this.nanos < that.nanos + def >(that: NanoInterval): Boolean = this.nanos > that.nanos + def ==(that: NanoInterval): Boolean = this.nanos == that.nanos + def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos + def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos + + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoInterval { + def default: NanoInterval = new NanoInterval(0L) + def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos) +} diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala index d363d771..df51f411 100644 --- a/kamon-core/src/main/scala/kamon/trace/Incubator.scala +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -1,32 +1,70 @@ package kamon.trace -import akka.actor.{ Props, Actor, ActorRef } +import java.util.concurrent.TimeUnit + +import akka.actor.{ ActorLogging, Props, Actor, ActorRef } +import kamon.{ NanoInterval, RelativeNanoTimestamp } import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.duration._ -class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor { +class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging { import context.dispatcher - val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces) - var incubating = Queue.empty[IncubatingTrace] + val config = context.system.settings.config.getConfig("kamon.trace.incubator") + + val minIncubationTime = new NanoInterval(config.getDuration("min-incubation-time", TimeUnit.NANOSECONDS)) + val maxIncubationTime = new NanoInterval(config.getDuration("max-incubation-time", TimeUnit.NANOSECONDS)) + val checkInterval = config.getDuration("check-interval", TimeUnit.MILLISECONDS) + + val checkSchedule = context.system.scheduler.schedule(checkInterval.millis, checkInterval.millis, self, CheckForCompletedTraces) + var waitingForMinimumIncubation = Queue.empty[IncubatingTrace] + var waitingForIncubationFinish = List.empty[IncubatingTrace] def receive = { - case CheckForCompletedTraces ⇒ dispatchCompleted() - case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc)) + case tc: TracingContext ⇒ incubate(tc) + case CheckForCompletedTraces ⇒ + checkWaitingForMinimumIncubation() + checkWaitingForIncubationFinish() } - @tailrec private def dispatchCompleted(): Unit = { - if (incubating.nonEmpty) { - val it = incubating.head - if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) { - it.tc.generateTraceInfo.map(subscriptions ! _) - incubating = incubating.tail - dispatchCompleted() + def incubate(tc: TracingContext): Unit = + waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now)) + + @tailrec private def checkWaitingForMinimumIncubation(): Unit = { + if (waitingForMinimumIncubation.nonEmpty) { + val it = waitingForMinimumIncubation.head + if (NanoInterval.since(it.incubationStart) >= minIncubationTime) { + waitingForMinimumIncubation = waitingForMinimumIncubation.tail + + if (it.tc.shouldIncubate) + waitingForIncubationFinish = it :: waitingForIncubationFinish + else + dispatchTraceInfo(it.tc) + + checkWaitingForMinimumIncubation() } } } + private def checkWaitingForIncubationFinish(): Unit = { + waitingForIncubationFinish = waitingForIncubationFinish.filter { + case IncubatingTrace(context, incubationStart) ⇒ + if (!context.shouldIncubate) { + dispatchTraceInfo(context) + false + } else { + if (NanoInterval.since(incubationStart) >= maxIncubationTime) { + log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token) + dispatchTraceInfo(context); + false + } else true + } + } + } + + def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo + override def postStop(): Unit = { super.postStop() checkSchedule.cancel() @@ -35,11 +73,8 @@ class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Ac object Incubator { - def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime)) + def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions)) case object CheckForCompletedTraces - case class IncubatingTrace(tc: TracingContext) { - private val incubationStartNanoTime = System.nanoTime() - def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime - } + case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp) } diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala index 04e61407..f478d971 100644 --- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -4,50 +4,42 @@ import java.util.concurrent.ConcurrentLinkedQueue import akka.actor.ActorSystem import akka.event.LoggingAdapter -import kamon.Kamon +import kamon.{ RelativeNanoTimestamp, NanoInterval } import kamon.metric.TraceMetrics.TraceMetricRecorder -import kamon.metric.{ MetricsExtension, TraceMetrics, Metrics } +import kamon.metric.{ MetricsExtension, TraceMetrics } import scala.annotation.tailrec -class MetricsOnlyContext( - traceName: String, - val token: String, - izOpen: Boolean, - val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, - nanoTimeztamp: Long, - log: LoggingAdapter, - metricsExtension: MetricsExtension, - val system: ActorSystem) +private[trace] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, val origin: TraceContextOrigin, + val startRelativeTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val system: ActorSystem) extends TraceContext { @volatile private var _name = traceName @volatile private var _isOpen = izOpen - @volatile protected var _elapsedNanoTime = 0L + @volatile protected var _elapsedTime = NanoInterval.default - private val _nanoTimestamp = nanoTimeztamp private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() - private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage + private val _traceLocalStorage = new TraceLocalStorage def rename(newName: String): Unit = - if (isOpen) _name = newName - else if (log.isWarningEnabled) log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) + if (isOpen) + _name = newName + else if (log.isWarningEnabled) + log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) def name: String = _name def isEmpty: Boolean = false def isOpen: Boolean = _isOpen - def nanoTimestamp: Long = _nanoTimestamp - def elapsedNanoTime: Long = _elapsedNanoTime def addMetadata(key: String, value: String): Unit = {} def finish(): Unit = { _isOpen = false - _elapsedNanoTime = System.nanoTime() - _nanoTimestamp - val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) + val traceElapsedTime = NanoInterval.since(startRelativeTimestamp) + _elapsedTime = traceElapsedTime + val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(elapsedNanoTime) + traceMetrics.elapsedTime.record(traceElapsedTime.nanos) drainFinishedSegments(traceMetrics) } } @@ -58,12 +50,12 @@ class MetricsOnlyContext( @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { val segment = _finishedSegments.poll() if (segment != null) { - metricRecorder.segmentRecorder(segment.identity).record(segment.duration) + metricRecorder.segmentRecorder(segment.identity).record(segment.duration.nanos) drainFinishedSegments(metricRecorder) } } - protected def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { + protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration)) if (isClosed) { @@ -73,27 +65,41 @@ class MetricsOnlyContext( } } + // Should only be used by the TraceLocal utilities. + def traceLocalStorage: TraceLocalStorage = _traceLocalStorage + + // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default + // will be returned. + def elapsedTime: NanoInterval = _elapsedTime + class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { - protected val segmentStartNanoTime = System.nanoTime() + private val _startTimestamp = RelativeNanoTimestamp.now @volatile private var _segmentName = segmentName - @volatile private var _elapsedNanoTime = 0L - @volatile protected var _isOpen = true + @volatile private var _elapsedTime = NanoInterval.default + @volatile private var _isOpen = true def name: String = _segmentName def isEmpty: Boolean = false def addMetadata(key: String, value: String): Unit = {} def isOpen: Boolean = _isOpen - def elapsedNanoTime: Long = _elapsedNanoTime def rename(newName: String): Unit = - if (isOpen) _segmentName = newName - else if (log.isWarningEnabled) log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) + if (isOpen) + _segmentName = newName + else if (log.isWarningEnabled) + log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) def finish: Unit = { _isOpen = false - _elapsedNanoTime = System.nanoTime() - segmentStartNanoTime + val segmentElapsedTime = NanoInterval.since(_startTimestamp) + _elapsedTime = segmentElapsedTime - finishSegment(name, category, library, elapsedNanoTime) + finishSegment(name, category, library, segmentElapsedTime) } + + // Handle with care and make sure that the segment is closed before calling this method, otherwise + // NanoInterval.default will be returned. + def elapsedTime: NanoInterval = _elapsedTime + def startTimestamp: RelativeNanoTimestamp = _startTimestamp } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala index 60a400f8..650592a5 100644 --- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -2,21 +2,22 @@ package kamon.trace import java.util.concurrent.atomic.AtomicLong +import kamon.NanoInterval import scala.concurrent.forkjoin.ThreadLocalRandom trait Sampler { def shouldTrace: Boolean - def shouldReport(traceElapsedNanoTime: Long): Boolean + def shouldReport(traceElapsedTime: NanoInterval): Boolean } object NoSampling extends Sampler { def shouldTrace: Boolean = false - def shouldReport(traceElapsedNanoTime: Long): Boolean = false + def shouldReport(traceElapsedTime: NanoInterval): Boolean = false } object SampleAll extends Sampler { def shouldTrace: Boolean = true - def shouldReport(traceElapsedNanoTime: Long): Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true } class RandomSampler(chance: Int) extends Sampler { @@ -24,7 +25,7 @@ class RandomSampler(chance: Int) extends Sampler { require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance - def shouldReport(traceElapsedNanoTime: Long): Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true } class OrderedSampler(interval: Int) extends Sampler { @@ -33,13 +34,13 @@ class OrderedSampler(interval: Int) extends Sampler { private val counter = new AtomicLong(0L) def shouldTrace: Boolean = counter.incrementAndGet() % interval == 0 // TODO: find a more efficient way to do this, protect from long overflow. - def shouldReport(traceElapsedNanoTime: Long): Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true } class ThresholdSampler(thresholdInNanoseconds: Long) extends Sampler { require(thresholdInNanoseconds > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0") def shouldTrace: Boolean = true - def shouldReport(traceElapsedNanoTime: Long): Boolean = traceElapsedNanoTime >= thresholdInNanoseconds + def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime.nanos >= thresholdInNanoseconds } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 2c0b38bf..466ceafd 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -17,59 +17,55 @@ package kamon.trace import java.io.ObjectStreamException - import akka.actor.ActorSystem -import kamon.Kamon +import kamon._ import kamon.metric._ -import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.metric.TraceMetrics.TraceMetricRecorder - -import scala.annotation.tailrec trait TraceContext { def name: String def token: String - def rename(name: String): Unit - def finish(): Unit def origin: TraceContextOrigin - def isOpen: Boolean - def isClosed: Boolean = !isOpen def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty + def isOpen: Boolean + def isClosed: Boolean = !isOpen + def system: ActorSystem + + def finish(): Unit + def rename(newName: String): Unit def startSegment(segmentName: String, category: String, library: String): Segment - def nanoTimestamp: Long def addMetadata(key: String, value: String) - - protected def elapsedNanoTime: Long + def startRelativeTimestamp: RelativeNanoTimestamp } trait Segment { def name: String - def rename(newName: String): Unit def category: String def library: String - def finish(): Unit + def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty def isOpen: Boolean def isClosed: Boolean = !isOpen - def isEmpty: Boolean - def addMetadata(key: String, value: String) - protected def elapsedNanoTime: Long + def finish(): Unit + def rename(newName: String): Unit + def addMetadata(key: String, value: String) } case object EmptyTraceContext extends TraceContext { def name: String = "empty-trace" def token: String = "" - def rename(name: String): Unit = {} - def finish(): Unit = {} def origin: TraceContextOrigin = TraceContextOrigin.Local - def isOpen: Boolean = false def isEmpty: Boolean = true + def isOpen: Boolean = false + def system: ActorSystem = sys.error("Can't obtain a ActorSystem from a EmptyTraceContext.") + + def finish(): Unit = {} + def rename(name: String): Unit = {} def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment - def nanoTimestamp: Long = 0L def addMetadata(key: String, value: String): Unit = {} - def elapsedNanoTime: Long = 0L + def startRelativeTimestamp = new RelativeNanoTimestamp(0L) case object EmptySegment extends Segment { val name: String = "empty-segment" @@ -77,15 +73,15 @@ case object EmptyTraceContext extends TraceContext { val library: String = "empty-library" def isEmpty: Boolean = true def isOpen: Boolean = false - def rename(newName: String): Unit = {} + def finish: Unit = {} + def rename(newName: String): Unit = {} def addMetadata(key: String, value: String): Unit = {} - def elapsedNanoTime: Long = 0L } } case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity -case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: Long) +case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: NanoInterval) object SegmentCategory { val HttpClient = "http-client" diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala index e5fbb15e..8bd9384a 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala @@ -19,9 +19,9 @@ package kamon.trace import akka.actor._ import akka.actor import akka.event.Logging +import kamon._ import kamon.metric.Metrics import kamon.util.GlobPathFilter -import kamon.Kamon class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.trace") @@ -44,11 +44,15 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val log = Logging(system, "TraceExtension") val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") - val incubator = system.actorOf(Incubator.props(subscriptions, 10000000000L)) + val incubator = system.actorOf(Incubator.props(subscriptions)) val metricsExtension = Kamon(Metrics)(system) - def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, nanoTimestamp: Long, system: ActorSystem): TraceContext = { - def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, metricsExtension, system) + def newTraceContext(traceName: String, token: String, origin: TraceContextOrigin, system: ActorSystem): TraceContext = + newTraceContext(traceName, token, true, origin, RelativeNanoTimestamp.now, system) + + def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, + startTimestamp: RelativeNanoTimestamp, system: ActorSystem): TraceContext = { + def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, detailLevel, origin, startTimestamp, log, metricsExtension, system) if (detailLevel == LevelOfDetail.MetricsOnly || origin == TraceContextOrigin.Remote) newMetricsOnlyContext @@ -56,20 +60,19 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { if (!sampler.shouldTrace) newMetricsOnlyContext else - new TracingContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, this, metricsExtension, system) + new TracingContext(traceName, token, true, detailLevel, origin, startTimestamp, log, metricsExtension, this, system) } } - def report(trace: TracingContext): Unit = if (sampler.shouldReport(trace.elapsedNanoTime)) { - if (trace.shouldIncubate) - incubator ! trace - else - trace.generateTraceInfo.map(subscriptions ! _) - } - def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber) def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber) + private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = + if (sampler.shouldReport(trace.elapsedTime)) + if (trace.shouldIncubate) + incubator ! trace + else + subscriptions ! trace.generateTraceInfo } object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { @@ -81,5 +84,5 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { } } -case class TraceInfo(name: String, token: String, startMilliTime: Long, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String], segments: List[SegmentInfo]) -case class SegmentInfo(name: String, category: String, library: String, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String]) \ No newline at end of file +case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index af47bf3c..703896c3 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -16,7 +16,7 @@ package kamon.trace -import kamon.Kamon +import kamon.{ MilliTimestamp, RelativeNanoTimestamp, Kamon } import scala.language.experimental.macros import java.util.concurrent.atomic.AtomicLong @@ -37,11 +37,11 @@ object TraceRecorder { def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = - Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), true, TraceContextOrigin.Local, System.nanoTime(), system) + Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), TraceContextOrigin.Local, system) - def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - val equivalentNanoTime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) - Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentNanoTime, system) + def joinRemoteTraceContext(traceName: String, traceToken: String, startTimestamp: MilliTimestamp, isOpen: Boolean, system: ActorSystem): TraceContext = { + val equivalentStartTimestamp = RelativeNanoTimestamp.relativeTo(startTimestamp) + Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentStartTimestamp, system) } def setContext(context: TraceContext): Unit = traceContextStorage.set(context) diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala index c9cbc754..6a8cb1c6 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -1,72 +1,76 @@ package kamon.trace import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.{ AtomicInteger, AtomicLongFieldUpdater } +import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorSystem import akka.event.LoggingAdapter -import kamon.Kamon -import kamon.metric.TraceMetrics.TraceMetricRecorder -import kamon.metric.{ MetricsExtension, Metrics, TraceMetrics } +import kamon.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp } +import kamon.metric.MetricsExtension -import scala.annotation.tailrec import scala.collection.concurrent.TrieMap -class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin, - nanoTimeztamp: Long, log: LoggingAdapter, traceExtension: TraceExtension, metricsExtension: MetricsExtension, system: ActorSystem) - extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, nanoTimeztamp, log, metricsExtension, system) { +private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin, + startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, traceExtension: TraceExtension, system: ActorSystem) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, startTimeztamp, log, metricsExtension, system) { - val openSegments = new AtomicInteger(0) - private val startMilliTime = System.currentTimeMillis() - private val allSegments = new ConcurrentLinkedQueue[TracingSegment]() - private val metadata = TrieMap.empty[String, String] + private val _openSegments = new AtomicInteger(0) + private val _startTimestamp = NanoTimestamp.now + private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]() + private val _metadata = TrieMap.empty[String, String] - override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) override def startSegment(segmentName: String, category: String, library: String): Segment = { - openSegments.incrementAndGet() + _openSegments.incrementAndGet() val newSegment = new TracingSegment(segmentName, category, library) - allSegments.add(newSegment) + _allSegments.add(newSegment) newSegment } override def finish(): Unit = { super.finish() - traceExtension.report(this) + traceExtension.dispatchTracingContext(this) } - override def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { - openSegments.decrementAndGet() + override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + _openSegments.decrementAndGet() super.finishSegment(segmentName, category, library, duration) } - def shouldIncubate: Boolean = isOpen || openSegments.get() > 0 + def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0 - def generateTraceInfo: Option[TraceInfo] = if (isOpen) None else { - val currentSegments = allSegments.iterator() - var segmentsInfo: List[SegmentInfo] = Nil + // Handle with care, should only be used after a trace is finished. + def generateTraceInfo: TraceInfo = { + require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.") + + val currentSegments = _allSegments.iterator() + var segmentsInfo = List.newBuilder[SegmentInfo] while (currentSegments.hasNext()) { val segment = currentSegments.next() - segment.createSegmentInfo match { - case Some(si) ⇒ segmentsInfo = si :: segmentsInfo - case None ⇒ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) - } + if (segment.isClosed) + segmentsInfo += segment.createSegmentInfo(_startTimestamp, startRelativeTimestamp) + else + log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) } - Some(TraceInfo(traceName, token, startMilliTime, nanoTimeztamp, elapsedNanoTime, metadata.toMap, segmentsInfo)) + TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result()) } class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { private val metadata = TrieMap.empty[String, String] override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) - override def finish: Unit = { - super.finish() - } + // Handle with care, should only be used after the segment has finished. + def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = { + require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.") - def createSegmentInfo: Option[SegmentInfo] = - if (isOpen) None - else Some(SegmentInfo(this.name, category, library, segmentStartNanoTime, elapsedNanoTime, metadata.toMap)) + // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both + // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. + val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) + + SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap) + } } } \ No newline at end of file diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala index a263ff7f..cda9cad7 100644 --- a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala +++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala @@ -88,13 +88,15 @@ class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with I expectNoMsg(2 seconds) secondSegment.finish() - val traceInfo = expectMsgType[TraceInfo] - Kamon(Trace)(system).unsubscribe(testActor) + within(10 seconds) { + val traceInfo = expectMsgType[TraceInfo] + Kamon(Trace)(system).unsubscribe(testActor) - traceInfo.name should be("simple-trace-without-segments") - traceInfo.segments.size should be(2) - traceInfo.segments.find(_.name == "segment-one") should be('defined) - traceInfo.segments.find(_.name == "segment-two") should be('defined) + traceInfo.name should be("simple-trace-without-segments") + traceInfo.segments.size should be(2) + traceInfo.segments.find(_.name == "segment-one") should be('defined) + traceInfo.segments.find(_.name == "segment-two") should be('defined) + } } } -- cgit v1.2.3