From 83edd204079e98a5e4d0248c29d84b6421693b6e Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 1 Apr 2016 16:58:31 -0300 Subject: Add tags for traces and closes #327 --- .../scala/kamon/trace/MetricsOnlyContext.scala | 39 ++++++++++++------ .../src/main/scala/kamon/trace/TraceContext.scala | 46 +++++++++++++++++----- .../src/main/scala/kamon/trace/TracerModule.scala | 2 +- .../main/scala/kamon/trace/TracingContext.scala | 19 +++++---- .../test/scala/kamon/testkit/BaseKamonSpec.scala | 6 +-- 5 files changed, 79 insertions(+), 33 deletions(-) diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala index c45a49d7..7ef3ac14 100644 --- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -20,12 +20,13 @@ import java.util.concurrent.ConcurrentLinkedQueue import akka.event.LoggingAdapter import kamon.Kamon -import kamon.metric.{ SegmentMetrics, MetricsModule, TraceMetrics } +import kamon.metric.{ SegmentMetrics, TraceMetrics } import kamon.util.{ NanoInterval, RelativeNanoTimestamp } import scala.annotation.tailrec +import scala.collection.concurrent.TrieMap -private[kamon] class MetricsOnlyContext(traceName: String, val token: String, tags: Map[String, String], izOpen: Boolean, val levelOfDetail: LevelOfDetail, +private[kamon] class MetricsOnlyContext(traceName: String, val token: String, traceTags: Map[String, String], izOpen: Boolean, val levelOfDetail: LevelOfDetail, val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter) extends TraceContext { @@ -35,6 +36,7 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, ta private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() private val _traceLocalStorage = new TraceLocalStorage + private val _tags = TrieMap.empty[String, String] ++= traceTags def rename(newName: String): Unit = if (isOpen) @@ -45,37 +47,44 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, ta def name: String = _name def isEmpty: Boolean = false def isOpen: Boolean = _isOpen + def addMetadata(key: String, value: String): Unit = {} + def addTag(key: String, value: String): Unit = _tags.put(key, value) + def removeTag(key: String, value: String): Boolean = _tags.remove(key, value) + def finish(): Unit = { _isOpen = false val traceElapsedTime = NanoInterval.since(startTimestamp) _elapsedTime = traceElapsedTime if (Kamon.metrics.shouldTrack(name, TraceMetrics.category)) - Kamon.metrics.entity(TraceMetrics, name, tags).elapsedTime.record(traceElapsedTime.nanos) + Kamon.metrics.entity(TraceMetrics, name, _tags.toMap).elapsedTime.record(traceElapsedTime.nanos) drainFinishedSegments() } def startSegment(segmentName: String, category: String, library: String): Segment = - new MetricsOnlySegment(segmentName, category, library) + startSegment(segmentName, category, library, Map.empty[String, String]) + + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = + new MetricsOnlySegment(segmentName, category, library, tags) @tailrec private def drainFinishedSegments(): Unit = { val segment = _finishedSegments.poll() if (segment != null) { - val segmentTags = Map( + val deefaultTags = Map( "trace" -> name, "category" -> segment.category, "library" -> segment.library) if (Kamon.metrics.shouldTrack(segment.name, SegmentMetrics.category)) - Kamon.metrics.entity(SegmentMetrics, segment.name, segmentTags).elapsedTime.record(segment.duration.nanos) + Kamon.metrics.entity(SegmentMetrics, segment.name, deefaultTags ++ segment.tags).elapsedTime.record(segment.duration.nanos) drainFinishedSegments() } } - protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { - _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration)) + protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval, segmentTags: Map[String, String]): Unit = { + _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration, segmentTags)) if (isClosed) { drainFinishedSegments() @@ -89,17 +98,23 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, ta // will be returned. def elapsedTime: NanoInterval = _elapsedTime - class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { + class MetricsOnlySegment(segmentName: String, val category: String, val library: String, segmentTags: Map[String, String]) extends Segment { private val _startTimestamp = RelativeNanoTimestamp.now + protected val _tags = TrieMap.empty[String, String] ++= segmentTags + @volatile private var _segmentName = segmentName @volatile private var _elapsedTime = NanoInterval.default @volatile private var _isOpen = true def name: String = _segmentName def isEmpty: Boolean = false - def addMetadata(key: String, value: String): Unit = {} def isOpen: Boolean = _isOpen + def addMetadata(key: String, value: String): Unit = {} + + def addTag(key: String, value: String): Unit = _tags.put(key, value) + def removeTag(key: String, value: String): Boolean = _tags.remove(key, value) + def rename(newName: String): Unit = if (isOpen) _segmentName = newName @@ -111,7 +126,7 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, ta val segmentElapsedTime = NanoInterval.since(_startTimestamp) _elapsedTime = segmentElapsedTime - finishSegment(name, category, library, segmentElapsedTime) + finishSegment(name, category, library, segmentElapsedTime, _tags.toMap) } // Handle with care and make sure that the segment is closed before calling this method, otherwise @@ -121,4 +136,4 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, ta } } -case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval) +case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval, tags: Map[String, String]) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 9642233a..eee2ec14 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project + * Copyright © 2013-2016 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -17,8 +17,10 @@ package kamon.trace import java.io.ObjectStreamException +import java.util + import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.util.{ SameThreadExecutionContext, Supplier, Function, RelativeNanoTimestamp } +import kamon.util.{ Function, RelativeNanoTimestamp, SameThreadExecutionContext, Supplier } import scala.concurrent.Future @@ -34,8 +36,12 @@ trait TraceContext { def rename(newName: String): Unit def startSegment(segmentName: String, category: String, library: String): Segment + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment def addMetadata(key: String, value: String) + def addTag(key: String, value: String): Unit + def removeTag(key: String, value: String): Boolean + def startTimestamp: RelativeNanoTimestamp def collect[T](f: TraceContext ⇒ T): Option[T] = @@ -49,21 +55,33 @@ trait TraceContext { else None def withNewSegment[T](segmentName: String, category: String, library: String)(code: ⇒ T): T = { - val segment = startSegment(segmentName, category, library) - try code finally segment.finish() + withNewSegment(segmentName, category, library, Map.empty[String, String])(code) } - // Java variant. - def withNewSegment[T](segmentName: String, category: String, library: String, code: Supplier[T]): T = - withNewSegment(segmentName, category, library)(code.get) + def withNewSegment[T](segmentName: String, category: String, library: String, tags: Map[String, String])(code: ⇒ T): T = { + val segment = startSegment(segmentName, category, library, tags) + try code finally segment.finish() + } def withNewAsyncSegment[T](segmentName: String, category: String, library: String)(code: ⇒ Future[T]): Future[T] = { - val segment = startSegment(segmentName, category, library) + withNewAsyncSegment(segmentName, category, library, Map.empty[String, String])(code) + } + + def withNewAsyncSegment[T](segmentName: String, category: String, library: String, tags: Map[String, String])(code: ⇒ Future[T]): Future[T] = { + val segment = startSegment(segmentName, category, library, tags) val result = code result.onComplete(_ ⇒ segment.finish())(SameThreadExecutionContext) result } + // Java variant. + def withNewSegment[T](segmentName: String, category: String, library: String, code: Supplier[T]): T = + withNewSegment(segmentName, category, library)(code.get) + + def withNewSegment[T](segmentName: String, category: String, library: String, tags: util.Map[String, String], code: Supplier[T]): T = { + import scala.collection.JavaConverters._ + withNewSegment(segmentName, category, library, tags.asScala.toMap)(code.get) + } } trait Segment { @@ -77,7 +95,10 @@ trait Segment { def finish(): Unit def rename(newName: String): Unit - def addMetadata(key: String, value: String) + def addMetadata(key: String, value: String): Unit + + def addTag(key: String, value: String): Unit + def removeTag(key: String, value: String): Boolean } case object EmptyTraceContext extends TraceContext { @@ -89,8 +110,13 @@ case object EmptyTraceContext extends TraceContext { def finish(): Unit = {} def rename(name: String): Unit = {} def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = EmptySegment def addMetadata(key: String, value: String): Unit = {} def startTimestamp = new RelativeNanoTimestamp(0L) + def addTags(tags: Map[String, String]): Unit = {} + def addTag(key: String, value: String): Unit = {} + def removeTags(tags: Map[String, String]): Unit = {} + def removeTag(key: String, value: String): Boolean = false case object EmptySegment extends Segment { val name: String = "empty-segment" @@ -102,6 +128,8 @@ case object EmptyTraceContext extends TraceContext { def finish: Unit = {} def rename(newName: String): Unit = {} def addMetadata(key: String, value: String): Unit = {} + def addTag(key: String, value: String): Unit = {} + def removeTag(key: String, value: String): Boolean = false } } diff --git a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala index 3ff32751..60187729 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala @@ -194,4 +194,4 @@ private[kamon] object TracerModuleImpl { } case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) -case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], tags: Map[String, String]) \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala index 1c735fbe..085b4b09 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -20,8 +20,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import akka.event.LoggingAdapter -import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp } -import kamon.metric.MetricsModule +import kamon.util.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp } import scala.collection.concurrent.TrieMap @@ -37,8 +36,12 @@ private[trace] class TracingContext(traceName: String, token: String, tags: Map[ override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) override def startSegment(segmentName: String, category: String, library: String): Segment = { + startSegment(segmentName, category, library, Map.empty[String, String]) + } + + override def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = { _openSegments.incrementAndGet() - val newSegment = new TracingSegment(segmentName, category, library) + val newSegment = new TracingSegment(segmentName, category, library, tags) _allSegments.add(newSegment) newSegment } @@ -48,9 +51,9 @@ private[trace] class TracingContext(traceName: String, token: String, tags: Map[ traceInfoSink(this) } - override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval, tags: Map[String, String]): Unit = { _openSegments.decrementAndGet() - super.finishSegment(segmentName, category, library, duration) + super.finishSegment(segmentName, category, library, duration, tags) } def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0 @@ -62,7 +65,7 @@ private[trace] class TracingContext(traceName: String, token: String, tags: Map[ val currentSegments = _allSegments.iterator() var segmentsInfo = List.newBuilder[SegmentInfo] - while (currentSegments.hasNext()) { + while (currentSegments.hasNext) { val segment = currentSegments.next() if (segment.isClosed) segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp) @@ -73,7 +76,7 @@ private[trace] class TracingContext(traceName: String, token: String, tags: Map[ TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result()) } - class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { + class TracingSegment(segmentName: String, category: String, library: String, tags: Map[String, String]) extends MetricsOnlySegment(segmentName, category, library, tags) { private val metadata = TrieMap.empty[String, String] override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) @@ -85,7 +88,7 @@ private[trace] class TracingContext(traceName: String, token: String, tags: Map[ // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) - SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap) + SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap, _tags.toMap) } } } \ No newline at end of file diff --git a/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala index 497cb77d..78c8607b 100644 --- a/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala +++ b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala @@ -16,12 +16,12 @@ package kamon.testkit -import akka.testkit.{ ImplicitSender, TestKitBase } import akka.actor.ActorSystem -import com.typesafe.config.{ Config, ConfigFactory } +import akka.testkit.{ ImplicitSender, TestKitBase } +import com.typesafe.config.Config import kamon.Kamon import kamon.metric.{ Entity, EntitySnapshot, SubscriptionsDispatcher } -import kamon.trace.{ TraceContext, Tracer } +import kamon.trace.TraceContext import kamon.util.LazyActorRef import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } -- cgit v1.2.3