From 34010efc7b273e50d805a277646f14aa96aaa8b2 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 14 Jul 2017 14:12:47 +0200 Subject: wip --- kamon-core/src/main/scala/kamon/Kamon.scala | 31 +-- .../src/main/scala/kamon/ReporterRegistry.scala | 8 +- .../src/main/scala/kamon/trace/ActiveSpan.scala | 70 ++++++ .../src/main/scala/kamon/trace/Continuation.scala | 39 ++++ .../scala/kamon/trace/IdentifierGenerator.scala | 57 +++++ .../src/main/scala/kamon/trace/Sampler.scala | 19 +- kamon-core/src/main/scala/kamon/trace/Span.scala | 247 +++++++++++---------- .../src/main/scala/kamon/trace/SpanContext.scala | 92 ++++++-- .../main/scala/kamon/trace/SpanContextCodec.scala | 170 +++++++++----- kamon-core/src/main/scala/kamon/trace/Tracer.scala | 217 ++++++++++-------- .../src/main/scala/kamon/util/BaggageOnMDC.scala | 96 ++++---- kamon-core/src/main/scala/kamon/util/Mixin.scala | 4 +- 12 files changed, 697 insertions(+), 353 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/Continuation.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index ecbc796e..5c7f9e53 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -16,23 +16,21 @@ package kamon import com.typesafe.config.{Config, ConfigFactory} -import io.opentracing.propagation.Format -import io.opentracing.{ActiveSpan, Span, SpanContext} import kamon.metric._ -import kamon.trace.Tracer +import kamon.trace.{ActiveSpan, Span, SpanContext, Tracer, Continuation} import kamon.util.{Filters, MeasurementUnit, Registration} import scala.concurrent.Future import java.time.Duration import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} -import io.opentracing.ActiveSpan.Continuation +import kamon.trace.SpanContextCodec.Format import org.slf4j.LoggerFactory import scala.util.Try -object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Tracer { +object Kamon extends MetricLookup with ReporterRegistry { private val logger = LoggerFactory.getLogger("kamon.Kamon") @volatile private var _config = ConfigFactory.load() @volatile private var _environment = Environment.fromConfig(_config) @@ -41,7 +39,7 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler")) private val _metrics = new MetricRegistry(_config, _scheduler) private val _reporters = new ReporterRegistryImpl(_metrics, _config) - private val _tracer = new Tracer(Kamon, _reporters, _config) + private val _tracer = new Tracer.Default(Kamon, _reporters, _config) private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] def environment: Environment = @@ -90,19 +88,19 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac def tracer: Tracer = _tracer - override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = + def buildSpan(operationName: String): Tracer.SpanBuilder = _tracer.buildSpan(operationName) - override def extract[C](format: Format[C], carrier: C): SpanContext = + def extract[C](format: Format[C], carrier: C): Option[SpanContext] = _tracer.extract(format, carrier) - override def inject[C](spanContext: SpanContext, format: Format[C], carrier: C): Unit = + def inject[C](spanContext: SpanContext, format: Format[C], carrier: C): Unit = _tracer.inject(spanContext, format, carrier) - override def activeSpan(): ActiveSpan = + def activeSpan(): ActiveSpan = _tracer.activeSpan() - override def makeActive(span: Span): ActiveSpan = + def makeActive(span: Span): ActiveSpan = _tracer.makeActive(span) @@ -133,13 +131,8 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac /** * Captures a continuation from the currently active Span (if any). */ - def activeSpanContinuation(): Continuation = { - val activeSpan = Kamon.activeSpan() - if(activeSpan == null) - null - else - activeSpan.capture() - } + def activeSpanContinuation(): Continuation = + activeSpan().capture() /** * Runs the provided closure with the currently active Span (if any). @@ -155,7 +148,7 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac * was no active Span then the provided fallback value */ def fromActiveSpan[T](code: ActiveSpan => T): Option[T] = - Option(activeSpan()).map(code) + None//activeSpan().map(code) override def loadReportersFromConfig(): Unit = diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 5f46edf6..8a36a7c7 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -53,7 +53,7 @@ trait MetricReporter extends Reporter { } trait SpanReporter extends Reporter { - def reportSpans(spans: Seq[Span.CompletedSpan]): Unit + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit } class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry { @@ -212,7 +212,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con } } - private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { + private[kamon] def reportSpan(span: Span.FinishedSpan): Unit = { spanReporters.foreach { case (_, reporterEntry) => if(reporterEntry.isActive) reporterEntry.buffer.offer(span) @@ -251,7 +251,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con val bufferCapacity: Int, val executionContext: ExecutionContextExecutorService ) { - val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity) + val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) } private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { @@ -290,7 +290,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con spanReporters.foreach { case (_, entry) => - val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity) + val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) entry.buffer.drainTo(spanBatch, entry.bufferCapacity) Future { diff --git a/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala b/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala new file mode 100644 index 00000000..3a46d94f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala @@ -0,0 +1,70 @@ +package kamon.trace + +/** + * Wraps a [[kamon.trace.Span]] that has been activated in the current Thread. By activated we really mean, it is + * stored in a ThreadLocal value inside the tracer until [[kamon.trace.ActiveSpan#deactivate()]] is called. + * + * When a [[kamon.trace.Span]] is activated it will keep a reference to the previously active Span on the current + * Thread, take it's place as the currently active Span and put the original one once this ActiveSpan gets deactivated. + * + */ +trait ActiveSpan extends Span { + + /** + * Sets the currently active Span to whatever Span was active when this Span was activated. + * + */ + def deactivate(): Unit +} + +object ActiveSpan { + + final class Default(wrappedSpan: Span, restoreOnDeactivate: ActiveSpan, tl: ThreadLocal[ActiveSpan]) + extends ActiveSpan { + + override def deactivate(): Unit = + tl.set(restoreOnDeactivate) + + // + // Forward all other members to the wrapped Span. + // + + override def annotate(annotation: Span.Annotation): Span = + wrappedSpan.annotate(annotation) + + override def addSpanTag(key: String, value: String): Span = + wrappedSpan.addSpanTag(key, value) + + override def addMetricTag(key: String, value: String): Span = + wrappedSpan.addMetricTag(key, value) + + override def addBaggage(key: String, value: String): Span = + wrappedSpan.addBaggage(key, value) + + override def getBaggage(key: String): Option[String] = + wrappedSpan.getBaggage(key) + + override def disableMetricsCollection(): Span = + wrappedSpan.disableMetricsCollection() + + override def context(): SpanContext = + wrappedSpan.context() + + override def setOperationName(operationName: String): Span = + wrappedSpan.setOperationName(operationName) + + override def finish(): Unit = + wrappedSpan.finish() + + override def finish(finishMicros: Long): Unit = + wrappedSpan.finish(finishMicros) + + override def capture(): Continuation = + wrappedSpan.capture() + } + + object Default { + def apply(wrappedSpan: Span, restoreOnDeactivate: ActiveSpan, tl: ThreadLocal[ActiveSpan]): Default = + new Default(wrappedSpan, restoreOnDeactivate, tl) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Continuation.scala b/kamon-core/src/main/scala/kamon/trace/Continuation.scala new file mode 100644 index 00000000..72d77597 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Continuation.scala @@ -0,0 +1,39 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + + + +trait Continuation { + def activate(): ActiveSpan +} + +object Continuation { + + /** + * + * @param span + * @param tracer + */ + final class Default(span: Span, tracer: Tracer) extends Continuation { + override def activate(): ActiveSpan = + tracer.makeActive(span) + } + + object Default { + def apply(span: Span, tracer: Tracer): Default = new Default(span, tracer) + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala b/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala new file mode 100644 index 00000000..ea23227a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala @@ -0,0 +1,57 @@ +package kamon.trace + +import java.nio.ByteBuffer +import java.util.concurrent.ThreadLocalRandom + +import kamon.util.HexCodec + +import scala.util.Try + +trait IdentityProvider { + def traceIdentifierGenerator(): IdentityProvider.Generator + def spanIdentifierGenerator(): IdentityProvider.Generator +} + +object IdentityProvider { + case class Identifier(string: String, bytes: Array[Byte]) + + val NoIdentifier = Identifier("", new Array[Byte](0)) + + + trait Generator { + def generate(): Identifier + def from(string: String): Identifier + def from(bytes: Array[Byte]): Identifier + } + + + class Default extends IdentityProvider { + private val generator = new Generator { + override def generate(): Identifier = { + val data = ByteBuffer.wrap(new Array[Byte](8)) + val random = ThreadLocalRandom.current().nextLong() + data.putLong(random) + + Identifier(HexCodec.toLowerHex(random), data.array()) + } + + override def from(string: String): Identifier = Try { + val identifierLong = HexCodec.lowerHexToUnsignedLong(string) + val data = ByteBuffer.allocate(8) + data.putLong(identifierLong) + + Identifier(string, data.array()) + } getOrElse(IdentityProvider.NoIdentifier) + + override def from(bytes: Array[Byte]): Identifier = Try { + val buffer = ByteBuffer.wrap(bytes) + val identifierLong = buffer.getLong + + Identifier(HexCodec.toLowerHex(identifierLong), bytes) + } getOrElse(IdentityProvider.NoIdentifier) + } + + override def traceIdentifierGenerator(): Generator = generator + override def spanIdentifierGenerator(): Generator = generator + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala index 0347a151..f478bbd1 100644 --- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -15,13 +15,16 @@ package kamon.trace +import java.util.concurrent.ThreadLocalRandom +import kamon.trace.SpanContext.SamplingDecision + trait Sampler { - def decide(spanID: Long): Boolean + def decide(operationName: String, builderTags: Map[String, String]): SamplingDecision } object Sampler { - val always = new Constant(true) - val never = new Constant(false) + val always = new Constant(SamplingDecision.Sample) + val never = new Constant(SamplingDecision.DoNotSample) def random(chance: Double): Sampler = { assert(chance >= 0D && chance <= 1.0D, "Change should be >= 0 and <= 1.0") @@ -33,8 +36,8 @@ object Sampler { } } - class Constant(decision: Boolean) extends Sampler { - override def decide(spanID: Long): Boolean = decision + class Constant(decision: SamplingDecision) extends Sampler { + override def decide(operationName: String, builderTags: Map[String, String]): SamplingDecision = decision override def toString: String = s"Sampler.Constant(decision = $decision)" @@ -44,8 +47,10 @@ object Sampler { val upperBoundary = Long.MaxValue * chance val lowerBoundary = -upperBoundary - override def decide(spanID: Long): Boolean = - spanID >= lowerBoundary && spanID <= upperBoundary + override def decide(operationName: String, builderTags: Map[String, String]): SamplingDecision = { + val random = ThreadLocalRandom.current().nextLong() + if(random >= lowerBoundary && random <= upperBoundary) SamplingDecision.Sample else SamplingDecision.DoNotSample + } override def toString: String = s"Sampler.Random(chance = $chance)" diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 8149be74..a113b9bc 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -16,155 +16,178 @@ package kamon package trace +import kamon.trace.SpanContext.SamplingDecision import scala.collection.JavaConverters._ import kamon.util.{Clock, MeasurementUnit} -class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long, - reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { +trait Span extends BaseSpan { - private var isOpen: Boolean = true - private val sampled: Boolean = spanContext.sampled - private var operationName: String = initialOperationName - private var endTimestampMicros: Long = 0 + def annotate(name: String): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty)) - private var tags = initialTags - private var logs = List.empty[Span.LogEntry] - private var additionalMetricTags = Map.empty[String, String] + def annotate(name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, fields)) + def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(timestampMicroseconds, name, fields)) - override def log(fields: java.util.Map[String, _]): Span = - log(fields.asScala.asInstanceOf[Map[String, _]]) - def log(fields: Map[String, _]): Span = synchronized { - if (sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs - this - } +} - def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, fields) :: logs - this - } +trait BaseSpan { - override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = - log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + def context(): SpanContext - override def log(event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs - this - } + def capture(): Continuation - override def log(timestampMicroseconds: Long, event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs - this - } + def annotate(annotation: Span.Annotation): Span - override def log(eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs - this - } + def addSpanTag(key: String, value: String): Span + + def addMetricTag(key: String, value: String): Span + + def addBaggage(key: String, value: String): Span + + def getBaggage(key: String): Option[String] + + def setOperationName(name: String): Span - override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs - this + def disableMetricsCollection(): Span + + def finish(): Unit + + def finish(finishTimestampMicros: Long): Unit + +} + +object Span { + + final class Empty(tracer: Tracer) extends Span { + override val context: SpanContext = SpanContext.EmptySpanContext + override def capture(): Continuation = Continuation.Default(this, tracer) + + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def addBaggage(key: String, value: String): Span = this + override def getBaggage(key: String): Option[String] = None + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(): Unit = {} + override def finish(finishTimestampMicros: Long): Unit = {} } - override def getBaggageItem(key: String): String = - spanContext.getBaggage(key) + object Empty { + def apply(tracer: Tracer): Empty = new Empty(tracer) + } - override def context(): SpanContext = - spanContext + /** + * + * @param spanContext + * @param initialOperationName + * @param initialTags + * @param startTimestampMicros + * @param reporterRegistry + */ + final class Real(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], + startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer) extends Span { + + private var collectMetrics: Boolean = true + private var isOpen: Boolean = true + private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample + private var operationName: String = initialOperationName + private var endTimestampMicros: Long = 0 + + private var spanTags = initialTags + private var customMetricTags = Map.empty[String, String] + private var annotations = List.empty[Span.Annotation] + + def annotate(annotation: Annotation): Span = synchronized { + if(sampled && isOpen) + annotations = annotation :: annotations + this + } - override def setTag(key: String, value: String): Span = synchronized { - if (isOpen) { - extractMetricTag(key, value) - if(sampled) - tags = tags ++ Map(key -> value) + override def addSpanTag(key: String, value: String): Span = synchronized { + if(sampled && isOpen) + spanTags = spanTags + (key -> value) + this } - this - } - override def setTag(key: String, value: Boolean): Span = { - if (isOpen) { - val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addMetricTag(key: String, value: String): Span = synchronized { + if(sampled && isOpen && collectMetrics) + customMetricTags = customMetricTags + (key -> value) + this } - this - } - override def setTag(key: String, value: Number): Span = { - if (isOpen) { - val tagValue = String.valueOf(value) - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addBaggage(key: String, value: String): Span = { + spanContext.baggage.add(key, value) + this } - this - } - def setMetricTag(key: String, value: String): Span = synchronized { - if (isOpen) - additionalMetricTags = additionalMetricTags ++ Map(key -> value) - this - } + override def getBaggage(key: String): Option[String] = + spanContext.baggage.get(key) - override def setBaggageItem(key: String, value: String): Span = synchronized { - if (isOpen) - spanContext.addBaggageItem(key, value) - this - } + override def disableMetricsCollection(): Span = synchronized { + collectMetrics = false + this + } - override def setOperationName(operationName: String): Span = synchronized { - if(isOpen) - this.operationName = operationName - this - } + override def context(): SpanContext = + spanContext + + override def setOperationName(operationName: String): Span = synchronized { + if(isOpen) + this.operationName = operationName + this + } - private def extractMetricTag(tag: String, value: String): Unit = - if(tag.startsWith(Span.MetricTagPrefix)) - additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) + override def finish(): Unit = + finish(Clock.microTimestamp()) - override def finish(): Unit = - finish(Clock.microTimestamp()) + override def finish(finishMicros: Long): Unit = synchronized { + if (isOpen) { + isOpen = false + endTimestampMicros = finishMicros - override def finish(finishMicros: Long): Unit = synchronized { - if (isOpen) { - isOpen = false - endTimestampMicros = finishMicros - recordSpanMetrics() + if(collectMetrics) + recordSpanMetrics() - if(sampled) - reporterRegistry.reportSpan(completedSpan) + if(sampled) + reporterRegistry.reportSpan(completedSpan) + } } - } - private def completedSpan: Span.CompletedSpan = - Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs) + override def capture(): Continuation = + Continuation.Default(this, tracer) + + private def completedSpan: Span.FinishedSpan = + Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations) - private def recordSpanMetrics(): Unit = { - val elapsedTime = endTimestampMicros - startTimestampMicros - val metricTags = Map("operation" -> operationName) ++ additionalMetricTags + private def recordSpanMetrics(): Unit = { + val elapsedTime = endTimestampMicros - startTimestampMicros + val metricTags = Map("operation" -> operationName) ++ customMetricTags - val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags) - latencyHistogram.record(elapsedTime) + val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags) + latencyHistogram.record(elapsedTime) - tags.get("error").foreach { errorTag => - if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) { - Span.Metrics.SpanErrorCount.refine(metricTags).increment() + spanTags.get("error").foreach { errorTag => + if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) { + Span.Metrics.SpanErrorCount.refine(metricTags).increment() + } } } } -} -object Span { + object Real { + def apply(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], + startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer): Real = + new Real(spanContext, initialOperationName, initialTags, startTimestampMicros, reporterRegistry, tracer) + } + + + object Metrics { val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds) val SpanErrorCount = Kamon.counter("span.error-count") @@ -174,14 +197,14 @@ object Span { val BooleanTagTrueValue = "1" val BooleanTagFalseValue = "0" - case class LogEntry(timestamp: Long, fields: Map[String, _]) + case class Annotation(timestamp: Long, name: String, fields: Map[String, String]) - case class CompletedSpan( + case class FinishedSpan( context: SpanContext, operationName: String, startTimestampMicros: Long, endTimestampMicros: Long, tags: Map[String, String], - logs: Seq[LogEntry] + annotations: Seq[Annotation] ) } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala index b37e208b..ae92f46d 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala @@ -15,26 +15,92 @@ package kamon.trace -import java.lang -import java.util.{Map => JavaMap} +import kamon.trace.IdentityProvider.Identifier +import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source} -import scala.collection.JavaConverters._ +/** + * + * @param traceID + * @param spanID + * @param parentID + * @param samplingDecision + * @param baggage + */ +case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identifier, samplingDecision: SamplingDecision, baggage: Baggage, source: Source) { -class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long, val sampled: Boolean, - private var baggage: Map[String, String]) extends io.opentracing.SpanContext { + def createChild(childSpanID: Identifier, samplingDecision: SamplingDecision): SpanContext = + this.copy(parentID = this.spanID, spanID = childSpanID) +} + +object SpanContext { - private[kamon] def addBaggageItem(key: String, value: String): Unit = synchronized { - baggage = baggage + (key -> value) + sealed trait Source + object Source { + case object Local extends Source + case object Remote extends Source } - private[kamon] def getBaggage(key: String): String = synchronized { - baggage.get(key).getOrElse(null) + val EmptySpanContext = SpanContext( + traceID = IdentityProvider.NoIdentifier, + spanID = IdentityProvider.NoIdentifier, + parentID = IdentityProvider.NoIdentifier, + samplingDecision = SamplingDecision.DoNotSample, + baggage = Baggage.EmptyBaggage, + source = Source.Local + ) + + + sealed trait SamplingDecision + object SamplingDecision { + + /** + * The Trace is sampled, all child Spans should be sampled as well. + */ + case object Sample extends SamplingDecision + + /** + * The Trace is not sampled, none of the child Spans should be sampled. + */ + case object DoNotSample extends SamplingDecision + + /** + * The sampling decision has not been taken yet, the Tracer is free to decide when creating a Span. + */ + case object Unknown extends SamplingDecision } - private[kamon] def baggageMap: Map[String, String] = - baggage + /** + * + */ + + sealed trait Baggage { + def add(key: String, value:String): Unit + def get(key: String): Option[String] + def getAll(): Map[String, String] + } + + object Baggage { + def apply(): Baggage = new DefaultBaggage() + + case object EmptyBaggage extends Baggage { + override def add(key: String, value: String): Unit = {} + override def get(key: String): Option[String] = None + override def getAll: Map[String, String] = Map.empty + } + + + final class DefaultBaggage extends Baggage { + private var baggage: Map[String, String] = Map.empty + + def add(key: String, value: String): Unit = synchronized { + baggage = baggage + (key -> value) + } + + def get(key: String): Option[String] = + baggage.get(key) - override def baggageItems(): lang.Iterable[JavaMap.Entry[String, String]] = synchronized { - baggage.asJava.entrySet() + def getAll: Map[String, String] = + baggage + } } } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala index 8e3a446b..23eb40db 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala @@ -17,82 +17,137 @@ package kamon.trace import java.net.{URLDecoder, URLEncoder} +import java.nio.ByteBuffer import java.util.concurrent.ThreadLocalRandom +import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source} + import scala.collection.JavaConverters._ -import io.opentracing.propagation.TextMap import kamon.util.HexCodec trait SpanContextCodec[T] { def inject(spanContext: SpanContext, carrier: T): Unit - def extract(carrier: T, sampler: Sampler): SpanContext + def extract(carrier: T): Option[SpanContext] +} + +trait TextMap { + def get(key: String): Option[String] + def put(key: String, value: String): Unit + def values: Iterator[(String, String)] } + object SpanContextCodec { - val TextMap: SpanContextCodec[TextMap] = new TextMapSpanCodec( - traceIDKey = "TRACE_ID", - parentIDKey = "PARENT_ID", - spanIDKey = "SPAN_ID", - sampledKey = "SAMPLED", - baggagePrefix = "BAGGAGE_", - baggageValueEncoder = identity, - baggageValueDecoder = identity - ) - - val ZipkinB3: SpanContextCodec[TextMap] = new TextMapSpanCodec( - traceIDKey = "X-B3-TraceId", - parentIDKey = "X-B3-ParentSpanId", - spanIDKey = "X-B3-SpanId", - sampledKey = "X-B3-Sampled", - baggagePrefix = "X-B3-Baggage-", - baggageValueEncoder = urlEncode, - baggageValueDecoder = urlDecode - ) + trait Format[C] + object Format { + case object TextMap extends Format[TextMap] + case object HttpHeaders extends Format[TextMap] + case object Binary extends Format[ByteBuffer] + } + +// val ExtendedB3: SpanContextCodec[TextMap] = new TextMapSpanCodec( +// traceIDKey = "X-B3-TraceId", +// parentIDKey = "X-B3-ParentSpanId", +// spanIDKey = "X-B3-SpanId", +// sampledKey = "X-B3-Sampled", +// baggageKey = "X-Kamon-Baggage-", +// baggageValueEncoder = urlEncode, +// baggageValueDecoder = urlDecode +// ) private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") - private class TextMapSpanCodec(traceIDKey: String, parentIDKey: String, spanIDKey: String, sampledKey: String, baggagePrefix: String, - baggageValueEncoder: String => String, baggageValueDecoder: String => String) extends SpanContextCodec[TextMap] { + private class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] { + import ExtendedB3.Headers + override def inject(spanContext: SpanContext, carrier: TextMap): Unit = { - carrier.put(traceIDKey, encodeLong(spanContext.traceID)) - carrier.put(parentIDKey, encodeLong(spanContext.parentID)) - carrier.put(spanIDKey, encodeLong(spanContext.spanID)) + carrier.put(Headers.TraceIdentifier, baggageValueEncoder(spanContext.traceID.string)) + carrier.put(parentIDKey, baggageValueEncoder(spanContext.parentID.string)) + carrier.put(spanIDKey, baggageValueEncoder(spanContext.spanID.string)) - spanContext.baggageItems().iterator().asScala.foreach { entry => - carrier.put(baggagePrefix + entry.getKey, baggageValueEncoder(entry.getValue)) + spanContext.baggage.getAll().foreach { + case (key, value) => carrier.put(baggageKey + key, baggageValueEncoder(value)) } } - override def extract(carrier: TextMap, sampler: Sampler): SpanContext = { - var traceID: String = null - var parentID: String = null - var spanID: String = null - var sampled: String = null - var baggage: Map[String, String] = Map.empty - - carrier.iterator().asScala.foreach { entry => - if(entry.getKey.equals(traceIDKey)) - traceID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(parentIDKey)) - parentID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(spanIDKey)) - spanID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(sampledKey)) - sampled = entry.getValue - else if(entry.getKey.startsWith(baggagePrefix)) - baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue)) - } + override def extract(carrier: TextMap): Option[SpanContext] = { + val traceID = carrier.get(Headers.TraceIdentifier) + .map(identityProvider.traceIdentifierGenerator().from) + .getOrElse(IdentityProvider.NoIdentifier) + + val spanID = carrier.get(Headers.SpanIdentifier) + .map(identityProvider.spanIdentifierGenerator().from) + .getOrElse(IdentityProvider.NoIdentifier) + + if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { + val parentID = carrier.get(Headers.ParentSpanIdentifier) + .map(identityProvider.spanIdentifierGenerator().from) + .getOrElse(IdentityProvider.NoIdentifier) + + val samplingDecision = carrier.get(Headers.Flags).orElse(carrier.get(Headers.Sampled)) match { + case Some(sampled) if sampled == "1" => SamplingDecision.Sample + case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + + + + + Some(SpanContext(traceID, spanID, parentID, samplingDecision, ???, Source.Remote)) + + } else None + + val minimalSpanContext = + for { + traceID <- carrier.get(traceIDKey).map(identityProvider.traceIdentifierGenerator().from) + spanID <- carrier.get(spanIDKey).map(identityProvider.spanIdentifierGenerator().from) + } yield { + + + } + + + +// var traceID: String = null +// var parentID: String = null +// var spanID: String = null +// var sampled: String = null +// var baggage: Map[String, String] = Map.empty +// +// carrier.iterator().asScala.foreach { entry => +// if(entry.getKey.equals(traceIDKey)) +// traceID = baggageValueDecoder(entry.getValue) +// else if(entry.getKey.equals(parentIDKey)) +// parentID = baggageValueDecoder(entry.getValue) +// else if(entry.getKey.equals(spanIDKey)) +// spanID = baggageValueDecoder(entry.getValue) +// else if(entry.getKey.equals(sampledKey)) +// sampled = entry.getValue +// else if(entry.getKey.startsWith(baggagePrefix)) +// baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue)) +// } +// +// if(traceID != null && spanID != null) { +// val actualParent = if(parentID == null) 0L else decodeLong(parentID) +// val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1") +// +// new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage) +// } else null +// +// None + } - if(traceID != null && spanID != null) { - val actualParent = if(parentID == null) 0L else decodeLong(parentID) - val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1") + private def encodeBaggage(baggage: Baggage): String = { + if(baggage.getAll().nonEmpty) { - new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage) - } else null + baggage.getAll().foreach { + case (key, value) => + } + } else "" } private def decodeLong(input: String): Long = @@ -102,4 +157,15 @@ object SpanContextCodec { HexCodec.toLowerHex(input) } + + object ExtendedB3 { + object Headers { + val TraceIdentifier = "X-B3-TraceId" + val ParentSpanIdentifier = "X-B3-ParentSpanId" + val SpanIdentifier = "X-B3-SpanId" + val Sampled = "X-B3-Sampled" + val Flags = "X-B3-Flags" + val Baggage = "X-B3-Extra-Baggage" + } + } } diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 19067f5e..1aec8d7c 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -16,148 +16,173 @@ package kamon.trace -import java.util.concurrent.ThreadLocalRandom +import java.nio.ByteBuffer import com.typesafe.config.Config -import io.opentracing.propagation.{Format, TextMap} -import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP} -import io.opentracing.util.ThreadLocalActiveSpanSource import kamon.ReporterRegistryImpl import kamon.metric.MetricLookup +import kamon.trace.SpanContext.{SamplingDecision, Source} +import kamon.trace.Tracer.SpanBuilder import kamon.util.Clock import org.slf4j.LoggerFactory -class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) - extends ThreadLocalActiveSpanSource with io.opentracing.Tracer { +trait Tracer { + def buildSpan(operationName: String): SpanBuilder + def activeSpan(): ActiveSpan + def makeActive(span: Span): ActiveSpan - private val logger = LoggerFactory.getLogger(classOf[Tracer]) - private val tracerMetrics = new TracerMetrics(metrics) + def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] + def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit - @volatile private var configuredSampler: Sampler = Sampler.never - @volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap - @volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3 - reconfigure(initialConfig) - override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = - new SpanBuilder(operationName) + // + // Configuration Utilities + // - override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match { - case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler) - case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler) - case BINARY => null // TODO: Implement Binary Encoding - case _ => null - } + def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit + def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit +} - override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = format match { - case HTTP_HEADERS => httpHeaderSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) - case TEXT_MAP => textMapSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) - case BINARY => - case _ => - } +object Tracer { - def sampler: Sampler = - configuredSampler + final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { + private val logger = LoggerFactory.getLogger(classOf[Tracer]) + private val emptySpan = Span.Empty(this) + private val activeSpanStorage: ThreadLocal[ActiveSpan] = new ThreadLocal[ActiveSpan] { + override def initialValue(): ActiveSpan = ActiveSpan.Default(emptySpan, null, activeSpanStorage) + } - def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.textMapSpanContextCodec = codec + private[Tracer] val tracerMetrics = new TracerMetrics(metrics) + @volatile private[Tracer] var joinRemoteSpansWithSameID: Boolean = false + @volatile private[Tracer] var configuredSampler: Sampler = Sampler.never + @volatile private[Tracer] var idGenerator: IdentifierGenerator = IdentifierGenerator.RandomLong() + @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.TextMap + @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ZipkinB3 - def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.httpHeaderSpanContextCodec = codec + reconfigure(initialConfig) - private class SpanBuilder(operationName: String) extends io.opentracing.Tracer.SpanBuilder { - private var parentContext: SpanContext = _ - private var startTimestamp = 0L - private var initialTags = Map.empty[String, String] - private var useActiveSpanAsParent = true + def buildSpan(operationName: String): SpanBuilder = + new SpanBuilder(operationName, this, reporterRegistry) + + def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match { + case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) + case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) + case SpanContextCodec.Format.Binary => None + case _ => None + } - override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = parent match { - case spanContext: kamon.trace.SpanContext => - this.parentContext = spanContext - this - case null => this - case _ => logger.error("Can't extract the parent ID from a non-Kamon SpanContext"); this + def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit = format match { + case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) + case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) + case SpanContextCodec.Format.Binary => + case _ => } - override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = - asChildOf(parent.context()) + def activeSpan(): ActiveSpan = + activeSpanStorage.get() - override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { - if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) { - asChildOf(referencedContext) - } else this + def makeActive(span: Span): ActiveSpan = { + val currentlyActiveSpan = activeSpanStorage.get() + val newActiveSpan = ActiveSpan.Default(span, currentlyActiveSpan, activeSpanStorage) + activeSpanStorage.set(newActiveSpan) + newActiveSpan } - override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value) - this + def sampler: Sampler = + configuredSampler + + def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = + this.textMapSpanContextCodec = codec + + def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = + this.httpHeaderSpanContextCodec = codec + + + private[kamon] def reconfigure(config: Config): Unit = synchronized { + val traceConfig = config.getConfig("kamon.trace") + + configuredSampler = traceConfig.getString("sampler") match { + case "always" => Sampler.always + case "never" => Sampler.never + case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance")) + case other => sys.error(s"Unexpected sampler name $other.") + } + } + + private final class TracerMetrics(metricLookup: MetricLookup) { + val createdSpans = metricLookup.counter("tracer.spans-created") } + } + + final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { + private var parentContext: SpanContext = _ + private var startTimestamp = 0L + private var initialTags = Map.empty[String, String] + private var useActiveSpanAsParent = true - override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value.toString) + def asChildOf(parentContext: SpanContext): SpanBuilder = { + this.parentContext = parentContext this } - override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value.toString) + def asChildOf(parentSpan: Span): SpanBuilder = + asChildOf(parentSpan.context()) + + def withSpanTag(key: String, value: String): SpanBuilder = { + this.initialTags = this.initialTags + (key -> value) this } - override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = { + def withStartTimestamp(microseconds: Long): SpanBuilder = { this.startTimestamp = microseconds this } - override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = { + def ignoreActiveSpan(): SpanBuilder = { this.useActiveSpanAsParent = false this } - override def start(): io.opentracing.Span = - startManual() - - override def startActive(): io.opentracing.ActiveSpan = - makeActive(startManual()) - - override def startManual(): Span = { + def start(): Span = { val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() - if(parentContext == null && useActiveSpanAsParent) { - val possibleParent = activeSpan() - if(possibleParent != null) - parentContext = possibleParent.context().asInstanceOf[SpanContext] - } + val parentSpanContext: Option[SpanContext] = Option(parentContext) + .orElse(if(useActiveSpanAsParent) Some(tracer.activeSpan().context()) else None) + .filter(spanContext => spanContext != SpanContext.EmptySpanContext) - val spanContext = - if(parentContext != null) - new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, parentContext.baggageMap) - else { - val traceID = createID() - new SpanContext(traceID, traceID, 0L, configuredSampler.decide(traceID), Map.empty) - } + val samplingDecision: SamplingDecision = parentSpanContext + .map(_.samplingDecision) + .filter(_ != SamplingDecision.Unknown) + .getOrElse(tracer.sampler.decide(operationName, initialTags)) - tracerMetrics.createdSpans.increment() - new Span(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry) - } - - private def createID(): Long = - ThreadLocalRandom.current().nextLong() - } - - - private[kamon] def reconfigure(config: Config): Unit = synchronized { - val traceConfig = config.getConfig("kamon.trace") + val spanContext = parentSpanContext match { + case Some(parent) => joinParentContext(parent, samplingDecision) + case None => newSpanContext(samplingDecision) + } - configuredSampler = traceConfig.getString("sampler") match { - case "always" => Sampler.always - case "never" => Sampler.never - case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance")) - case other => sys.error(s"Unexpected sampler name $other.") + tracer.tracerMetrics.createdSpans.increment() + Span.Real(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry, tracer) } - } - private final class TracerMetrics(metricLookup: MetricLookup) { - val createdSpans = metricLookup.counter("tracer.spans-created") + private def joinParentContext(parent: SpanContext, samplingDecision: SamplingDecision): SpanContext = + if(parent.source == Source.Remote && tracer.joinRemoteSpansWithSameID) + parent.copy(samplingDecision = samplingDecision) + else + parent.createChild(tracer.idGenerator.generateSpanID(), samplingDecision) + + private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = + SpanContext( + traceID = tracer.idGenerator.generateTraceID(), + spanID = tracer.idGenerator.generateSpanID(), + parentID = tracer.idGenerator.generateEmptyID(), + samplingDecision = samplingDecision, + baggage = SpanContext.Baggage(), + source = Source.Local + ) + + def startActive(): ActiveSpan = + tracer.makeActive(start()) } } diff --git a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala index 885a73d9..83027cc5 100644 --- a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala +++ b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala @@ -22,51 +22,51 @@ import kamon.Kamon import org.slf4j.MDC import scala.collection.JavaConverters._ - -object BaggageOnMDC { - - /** - * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys - * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well. - * - */ - def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = { - val activeSpan = Kamon.activeSpan() - if(activeSpan == null) - code - else { - val baggageItems = activeSpan.context().baggageItems().asScala - baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue)) - if(includeTraceID) - addTraceIDToMDC(activeSpan.context()) - - val evaluatedCode = code - - baggageItems.foreach(entry => MDC.remove(entry.getKey)) - if(includeTraceID) - removeTraceIDFromMDC() - - evaluatedCode - - } - } - - def withBaggageOnMDC[T](code: Supplier[T]): T = - withBaggageOnMDC(true, code.get()) - - def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T = - withBaggageOnMDC(includeTraceID, code.get()) - - def withBaggageOnMDC[T](code: => T): T = - withBaggageOnMDC(true, code) - - private val TraceIDKey = "trace_id" - - private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match { - case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID)) - case _ => - } - - private def removeTraceIDFromMDC(): Unit = - MDC.remove(TraceIDKey) -} +// +//object BaggageOnMDC { +// +// /** +// * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys +// * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well. +// * +// */ +// def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = { +// val activeSpan = Kamon.activeSpan() +// if(activeSpan == null) +// code +// else { +// val baggageItems = activeSpan.context().baggageItems().asScala +// baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue)) +// if(includeTraceID) +// addTraceIDToMDC(activeSpan.context()) +// +// val evaluatedCode = code +// +// baggageItems.foreach(entry => MDC.remove(entry.getKey)) +// if(includeTraceID) +// removeTraceIDFromMDC() +// +// evaluatedCode +// +// } +// } +// +// def withBaggageOnMDC[T](code: Supplier[T]): T = +// withBaggageOnMDC(true, code.get()) +// +// def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T = +// withBaggageOnMDC(includeTraceID, code.get()) +// +// def withBaggageOnMDC[T](code: => T): T = +// withBaggageOnMDC(true, code) +// +// private val TraceIDKey = "trace_id" +// +// private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match { +// case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID)) +// case _ => +// } +// +// private def removeTraceIDFromMDC(): Unit = +// MDC.remove(TraceIDKey) +//} diff --git a/kamon-core/src/main/scala/kamon/util/Mixin.scala b/kamon-core/src/main/scala/kamon/util/Mixin.scala index 348b34f1..318679c1 100644 --- a/kamon-core/src/main/scala/kamon/util/Mixin.scala +++ b/kamon-core/src/main/scala/kamon/util/Mixin.scala @@ -16,8 +16,8 @@ package kamon package util -import io.opentracing.ActiveSpan -import io.opentracing.ActiveSpan.Continuation +import kamon.trace.{ActiveSpan, Continuation} + /** * Utility trait that marks objects carrying an ActiveSpan.Continuation. -- cgit v1.2.3