diff options
Diffstat (limited to 'kamon-core/src/main')
5 files changed, 127 insertions, 55 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 9f6e58ee..6ac91356 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -63,14 +63,12 @@ object Kamon { private def tryLoadAutoweaveModule(): Unit = { val color = (msg: String) ⇒ s"""\u001B[32m${msg}\u001B[0m""" - Try { + log.info("Trying to load kamon-autoweave...") - log.info("Trying to load kamon-autoweave...") + Try { val autoweave = Class.forName("kamon.autoweave.Autoweave") - val instance = autoweave.newInstance() - - autoweave.getDeclaredMethod("attach").invoke(instance) + autoweave.getDeclaredMethod("attach").invoke(autoweave.newInstance()) } match { case Success(_) ⇒ diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala index c044719c..34b68f38 100644 --- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -20,12 +20,13 @@ import java.util.concurrent.ConcurrentLinkedQueue import akka.event.LoggingAdapter import kamon.Kamon -import kamon.metric.{ SegmentMetrics, MetricsModule, TraceMetrics } +import kamon.metric.{ SegmentMetrics, TraceMetrics } import kamon.util.{ NanoInterval, RelativeNanoTimestamp } import scala.annotation.tailrec +import scala.collection.concurrent.TrieMap -private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, +private[kamon] class MetricsOnlyContext(traceName: String, val token: String, traceTags: Map[String, String], izOpen: Boolean, val levelOfDetail: LevelOfDetail, val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter) extends TraceContext { @@ -35,6 +36,7 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() private val _traceLocalStorage = new TraceLocalStorage + private val _tags = TrieMap.empty[String, String] ++= traceTags def rename(newName: String): Unit = if (isOpen) @@ -45,37 +47,44 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz def name: String = _name def isEmpty: Boolean = false def isOpen: Boolean = _isOpen + def addMetadata(key: String, value: String): Unit = {} + def addTag(key: String, value: String): Unit = _tags.put(key, value) + def removeTag(key: String, value: String): Boolean = _tags.remove(key, value) + def finish(): Unit = { _isOpen = false val traceElapsedTime = NanoInterval.since(startTimestamp) _elapsedTime = traceElapsedTime if (Kamon.metrics.shouldTrack(name, TraceMetrics.category)) - Kamon.metrics.entity(TraceMetrics, name).elapsedTime.record(traceElapsedTime.nanos) + Kamon.metrics.entity(TraceMetrics, name, _tags.toMap).elapsedTime.record(traceElapsedTime.nanos) drainFinishedSegments() } def startSegment(segmentName: String, category: String, library: String): Segment = - new MetricsOnlySegment(segmentName, category, library) + startSegment(segmentName, category, library, Map.empty[String, String]) + + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = + new MetricsOnlySegment(segmentName, category, library, tags) @tailrec private def drainFinishedSegments(): Unit = { val segment = _finishedSegments.poll() if (segment != null) { - val segmentTags = Map( + val defaultTags = Map( "trace" -> name, "category" -> segment.category, "library" -> segment.library) if (Kamon.metrics.shouldTrack(segment.name, SegmentMetrics.category)) - Kamon.metrics.entity(SegmentMetrics, segment.name, segmentTags).elapsedTime.record(segment.duration.nanos) + Kamon.metrics.entity(SegmentMetrics, segment.name, defaultTags ++ segment.tags).elapsedTime.record(segment.duration.nanos) drainFinishedSegments() } } - protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { - _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration)) + protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval, segmentTags: Map[String, String]): Unit = { + _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration, segmentTags)) if (isClosed) { drainFinishedSegments() @@ -89,17 +98,23 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz // will be returned. def elapsedTime: NanoInterval = _elapsedTime - class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { + class MetricsOnlySegment(segmentName: String, val category: String, val library: String, segmentTags: Map[String, String]) extends Segment { private val _startTimestamp = RelativeNanoTimestamp.now + protected val _tags = TrieMap.empty[String, String] ++= segmentTags + @volatile private var _segmentName = segmentName @volatile private var _elapsedTime = NanoInterval.default @volatile private var _isOpen = true def name: String = _segmentName def isEmpty: Boolean = false - def addMetadata(key: String, value: String): Unit = {} def isOpen: Boolean = _isOpen + def addMetadata(key: String, value: String): Unit = {} + + def addTag(key: String, value: String): Unit = _tags.put(key, value) + def removeTag(key: String, value: String): Boolean = _tags.remove(key, value) + def rename(newName: String): Unit = if (isOpen) _segmentName = newName @@ -111,7 +126,7 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz val segmentElapsedTime = NanoInterval.since(_startTimestamp) _elapsedTime = segmentElapsedTime - finishSegment(name, category, library, segmentElapsedTime) + finishSegment(name, category, library, segmentElapsedTime, _tags.toMap) } // Handle with care and make sure that the segment is closed before calling this method, otherwise @@ -121,4 +136,4 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz } } -case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval) +case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval, tags: Map[String, String]) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 9642233a..eee2ec14 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright © 2013-2016 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -17,8 +17,10 @@ package kamon.trace import java.io.ObjectStreamException +import java.util + import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.util.{ SameThreadExecutionContext, Supplier, Function, RelativeNanoTimestamp } +import kamon.util.{ Function, RelativeNanoTimestamp, SameThreadExecutionContext, Supplier } import scala.concurrent.Future @@ -34,8 +36,12 @@ trait TraceContext { def rename(newName: String): Unit def startSegment(segmentName: String, category: String, library: String): Segment + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment def addMetadata(key: String, value: String) + def addTag(key: String, value: String): Unit + def removeTag(key: String, value: String): Boolean + def startTimestamp: RelativeNanoTimestamp def collect[T](f: TraceContext ⇒ T): Option[T] = @@ -49,21 +55,33 @@ trait TraceContext { else None def withNewSegment[T](segmentName: String, category: String, library: String)(code: ⇒ T): T = { - val segment = startSegment(segmentName, category, library) - try code finally segment.finish() + withNewSegment(segmentName, category, library, Map.empty[String, String])(code) } - // Java variant. - def withNewSegment[T](segmentName: String, category: String, library: String, code: Supplier[T]): T = - withNewSegment(segmentName, category, library)(code.get) + def withNewSegment[T](segmentName: String, category: String, library: String, tags: Map[String, String])(code: ⇒ T): T = { + val segment = startSegment(segmentName, category, library, tags) + try code finally segment.finish() + } def withNewAsyncSegment[T](segmentName: String, category: String, library: String)(code: ⇒ Future[T]): Future[T] = { - val segment = startSegment(segmentName, category, library) + withNewAsyncSegment(segmentName, category, library, Map.empty[String, String])(code) + } + + def withNewAsyncSegment[T](segmentName: String, category: String, library: String, tags: Map[String, String])(code: ⇒ Future[T]): Future[T] = { + val segment = startSegment(segmentName, category, library, tags) val result = code result.onComplete(_ ⇒ segment.finish())(SameThreadExecutionContext) result } + // Java variant. + def withNewSegment[T](segmentName: String, category: String, library: String, code: Supplier[T]): T = + withNewSegment(segmentName, category, library)(code.get) + + def withNewSegment[T](segmentName: String, category: String, library: String, tags: util.Map[String, String], code: Supplier[T]): T = { + import scala.collection.JavaConverters._ + withNewSegment(segmentName, category, library, tags.asScala.toMap)(code.get) + } } trait Segment { @@ -77,7 +95,10 @@ trait Segment { def finish(): Unit def rename(newName: String): Unit - def addMetadata(key: String, value: String) + def addMetadata(key: String, value: String): Unit + + def addTag(key: String, value: String): Unit + def removeTag(key: String, value: String): Boolean } case object EmptyTraceContext extends TraceContext { @@ -89,8 +110,13 @@ case object EmptyTraceContext extends TraceContext { def finish(): Unit = {} def rename(name: String): Unit = {} def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = EmptySegment def addMetadata(key: String, value: String): Unit = {} def startTimestamp = new RelativeNanoTimestamp(0L) + def addTags(tags: Map[String, String]): Unit = {} + def addTag(key: String, value: String): Unit = {} + def removeTags(tags: Map[String, String]): Unit = {} + def removeTag(key: String, value: String): Boolean = false case object EmptySegment extends Segment { val name: String = "empty-segment" @@ -102,6 +128,8 @@ case object EmptyTraceContext extends TraceContext { def finish: Unit = {} def rename(newName: String): Unit = {} def addMetadata(key: String, value: String): Unit = {} + def addTag(key: String, value: String): Unit = {} + def removeTag(key: String, value: String): Boolean = false } } diff --git a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala index 06286cae..60187729 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright © 2013-2016 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -16,17 +16,21 @@ package kamon.trace +import java.util + import akka.actor._ -import akka.event.{ LoggingAdapter, Logging } +import akka.event.{ Logging, LoggingAdapter } import com.typesafe.config.Config import kamon.Kamon import kamon.metric.MetricsModule import kamon.util._ +import scala.collection.JavaConverters._ trait TracerModule { def newContext(name: String): TraceContext def newContext(name: String, token: Option[String]): TraceContext - def newContext(name: String, token: Option[String], timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext + def newContext(name: String, token: Option[String], tags: Map[String, String]): TraceContext + def newContext(name: String, token: Option[String], tags: Map[String, String], timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext def subscribe(subscriber: ActorRef): Unit def unsubscribe(subscriber: ActorRef): Unit @@ -57,8 +61,8 @@ object Tracer { def withContext[T](context: TraceContext, code: Supplier[T]): T = withContext(context)(code.get) - def withNewContext[T](traceName: String, traceToken: Option[String], autoFinish: Boolean)(code: ⇒ T): T = { - withContext(Kamon.tracer.newContext(traceName, traceToken)) { + def withNewContext[T](traceName: String, traceToken: Option[String], tags: Map[String, String], autoFinish: Boolean)(code: ⇒ T): T = { + withContext(Kamon.tracer.newContext(traceName, traceToken, tags)) { val codeResult = code if (autoFinish) currentContext.finish() @@ -68,26 +72,47 @@ object Tracer { } def withNewContext[T](traceName: String)(code: ⇒ T): T = - withNewContext(traceName, None, false)(code) + withNewContext(traceName, None)(code) + + def withNewContext[T](traceName: String, tags: Map[String, String])(code: ⇒ T): T = + withNewContext(traceName, None, tags)(code) def withNewContext[T](traceName: String, traceToken: Option[String])(code: ⇒ T): T = - withNewContext(traceName, traceToken, false)(code) + withNewContext(traceName, traceToken, Map.empty[String, String])(code) + + def withNewContext[T](traceName: String, traceToken: Option[String], tags: Map[String, String])(code: ⇒ T): T = + withNewContext(traceName, traceToken, tags, autoFinish = false)(code) def withNewContext[T](traceName: String, autoFinish: Boolean)(code: ⇒ T): T = - withNewContext(traceName, None, autoFinish)(code) + withNewContext(traceName, None, Map.empty[String, String], autoFinish)(code) + + def withNewContext[T](traceName: String, tags: Map[String, String], autoFinish: Boolean)(code: ⇒ T): T = + withNewContext(traceName, None, tags, autoFinish)(code) // Java variants. def withNewContext[T](traceName: String, traceToken: Option[String], autoFinish: Boolean, code: Supplier[T]): T = - withNewContext(traceName, traceToken, autoFinish)(code.get) + withNewContext(traceName, traceToken, Map.empty[String, String], autoFinish)(code.get) + + def withNewContext[T](traceName: String, traceToken: Option[String], tags: util.Map[String, String], autoFinish: Boolean, code: Supplier[T]): T = + withNewContext(traceName, traceToken, tags.asScala.toMap, autoFinish)(code.get) def withNewContext[T](traceName: String, code: Supplier[T]): T = - withNewContext(traceName, None, false)(code.get) + withNewContext(traceName, None, Map.empty[String, String], autoFinish = false)(code.get) + + def withNewContext[T](traceName: String, tags: util.Map[String, String], code: Supplier[T]): T = + withNewContext(traceName, None, tags.asScala.toMap, autoFinish = false)(code.get) def withNewContext[T](traceName: String, traceToken: Option[String], code: Supplier[T]): T = - withNewContext(traceName, traceToken, false)(code.get) + withNewContext(traceName, traceToken, Map.empty[String, String], autoFinish = false)(code.get) + + def withNewContext[T](traceName: String, traceToken: Option[String], tags: util.Map[String, String], code: Supplier[T]): T = + withNewContext(traceName, traceToken, tags.asScala.toMap, autoFinish = false)(code.get) def withNewContext[T](traceName: String, autoFinish: Boolean, code: Supplier[T]): T = - withNewContext(traceName, None, autoFinish)(code.get) + withNewContext(traceName, None, Map.empty[String, String], autoFinish)(code.get) + + def withNewContext[T](traceName: String, tags: util.Map[String, String], autoFinish: Boolean, code: Supplier[T]): T = + withNewContext(traceName, None, tags.asScala.toMap, autoFinish)(code.get) } private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: Config) extends TracerModule { @@ -96,7 +121,7 @@ private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: C private val _subscriptions = new LazyActorRef private val _incubator = new LazyActorRef private val _dynamic = new akka.actor.ReflectiveDynamicAccess(getClass.getClassLoader) - private val _tokenGenerator = _dynamic.createInstanceFor[Function0[String]](_settings.tokenGeneratorFQN, Nil).get // let's bubble up any problems. + private val _tokenGenerator = _dynamic.createInstanceFor[() ⇒ String](_settings.tokenGeneratorFQN, Nil).get // let's bubble up any problems. private def newToken: String = _tokenGenerator() @@ -106,14 +131,17 @@ private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: C def newContext(name: String, token: Option[String]): TraceContext = createTraceContext(name, token) - def newContext(name: String, token: Option[String], timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext = - createTraceContext(name, token, timestamp, isOpen, isLocal) + def newContext(name: String, token: Option[String], tags: Map[String, String]): TraceContext = + createTraceContext(name, token, tags) + + def newContext(name: String, token: Option[String], tags: Map[String, String], timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext = + createTraceContext(name, token, tags, timestamp, isOpen, isLocal) - private def createTraceContext(traceName: String, token: Option[String], startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, + private def createTraceContext(traceName: String, token: Option[String], tags: Map[String, String] = Map.empty, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = { def newMetricsOnlyContext(token: String): TraceContext = - new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, _logger) + new MetricsOnlyContext(traceName, token, tags, isOpen, _settings.levelOfDetail, startTimestamp, _logger) val traceToken = token.getOrElse(newToken) @@ -123,7 +151,7 @@ private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: C case _ if !isLocal || !_settings.sampler.shouldTrace ⇒ newMetricsOnlyContext(traceToken) case _ ⇒ - new TracingContext(traceName, traceToken, true, _settings.levelOfDetail, isLocal, startTimestamp, _logger, dispatchTracingContext) + new TracingContext(traceName, traceToken, tags, izOpen = true, _settings.levelOfDetail, isLocal, startTimestamp, _logger, dispatchTracingContext) } } @@ -166,4 +194,4 @@ private[kamon] object TracerModuleImpl { } case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) -case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String])
\ No newline at end of file +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], tags: Map[String, String])
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala index 9269a99e..085b4b09 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -20,14 +20,13 @@ import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import akka.event.LoggingAdapter -import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp } -import kamon.metric.MetricsModule +import kamon.util.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp } import scala.collection.concurrent.TrieMap -private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, +private[trace] class TracingContext(traceName: String, token: String, tags: Map[String, String], izOpen: Boolean, levelOfDetail: LevelOfDetail, isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, traceInfoSink: TracingContext ⇒ Unit) - extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log) { + extends MetricsOnlyContext(traceName, token, tags, izOpen, levelOfDetail, startTimeztamp, log) { private val _openSegments = new AtomicInteger(0) private val _startTimestamp = NanoTimestamp.now @@ -37,8 +36,12 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) override def startSegment(segmentName: String, category: String, library: String): Segment = { + startSegment(segmentName, category, library, Map.empty[String, String]) + } + + override def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = { _openSegments.incrementAndGet() - val newSegment = new TracingSegment(segmentName, category, library) + val newSegment = new TracingSegment(segmentName, category, library, tags) _allSegments.add(newSegment) newSegment } @@ -48,9 +51,9 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo traceInfoSink(this) } - override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval, tags: Map[String, String]): Unit = { _openSegments.decrementAndGet() - super.finishSegment(segmentName, category, library, duration) + super.finishSegment(segmentName, category, library, duration, tags) } def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0 @@ -62,7 +65,7 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo val currentSegments = _allSegments.iterator() var segmentsInfo = List.newBuilder[SegmentInfo] - while (currentSegments.hasNext()) { + while (currentSegments.hasNext) { val segment = currentSegments.next() if (segment.isClosed) segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp) @@ -73,7 +76,7 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result()) } - class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { + class TracingSegment(segmentName: String, category: String, library: String, tags: Map[String, String]) extends MetricsOnlySegment(segmentName, category, library, tags) { private val metadata = TrieMap.empty[String, String] override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) @@ -85,7 +88,7 @@ private[trace] class TracingContext(traceName: String, token: String, izOpen: Bo // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) - SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap) + SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap, _tags.toMap) } } }
\ No newline at end of file |