From 483159862293a065be7cf3743d1aa759fbf31fc0 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 17 Jul 2017 13:13:41 +0200 Subject: working on ID generation and SpanContext encoding/decoding --- .../scala/kamon/trace/IdentifierGenerator.scala | 57 ------- .../main/scala/kamon/trace/IdentityProvider.scala | 60 +++++++ .../main/scala/kamon/trace/SpanContextCodec.scala | 181 +++++++++++---------- kamon-core/src/main/scala/kamon/trace/Tracer.scala | 69 ++++---- .../src/main/scala/kamon/util/BaggageOnMDC.scala | 72 -------- 5 files changed, 183 insertions(+), 256 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala delete mode 100644 kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala (limited to 'kamon-core/src/main') diff --git a/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala b/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala deleted file mode 100644 index ea23227a..00000000 --- a/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala +++ /dev/null @@ -1,57 +0,0 @@ -package kamon.trace - -import java.nio.ByteBuffer -import java.util.concurrent.ThreadLocalRandom - -import kamon.util.HexCodec - -import scala.util.Try - -trait IdentityProvider { - def traceIdentifierGenerator(): IdentityProvider.Generator - def spanIdentifierGenerator(): IdentityProvider.Generator -} - -object IdentityProvider { - case class Identifier(string: String, bytes: Array[Byte]) - - val NoIdentifier = Identifier("", new Array[Byte](0)) - - - trait Generator { - def generate(): Identifier - def from(string: String): Identifier - def from(bytes: Array[Byte]): Identifier - } - - - class Default extends IdentityProvider { - private val generator = new Generator { - override def generate(): Identifier = { - val data = ByteBuffer.wrap(new Array[Byte](8)) - val random = ThreadLocalRandom.current().nextLong() - data.putLong(random) - - Identifier(HexCodec.toLowerHex(random), data.array()) - } - - override def from(string: String): Identifier = Try { - val identifierLong = HexCodec.lowerHexToUnsignedLong(string) - val data = ByteBuffer.allocate(8) - data.putLong(identifierLong) - - Identifier(string, data.array()) - } getOrElse(IdentityProvider.NoIdentifier) - - override def from(bytes: Array[Byte]): Identifier = Try { - val buffer = ByteBuffer.wrap(bytes) - val identifierLong = buffer.getLong - - Identifier(HexCodec.toLowerHex(identifierLong), bytes) - } getOrElse(IdentityProvider.NoIdentifier) - } - - override def traceIdentifierGenerator(): Generator = generator - override def spanIdentifierGenerator(): Generator = generator - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala new file mode 100644 index 00000000..25e8f3c0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala @@ -0,0 +1,60 @@ +package kamon.trace + +import java.nio.ByteBuffer +import java.util.concurrent.ThreadLocalRandom + +import kamon.util.HexCodec + +import scala.util.Try + +trait IdentityProvider { + def traceIdentifierGenerator(): IdentityProvider.Generator + def spanIdentifierGenerator(): IdentityProvider.Generator +} + +object IdentityProvider { + case class Identifier(string: String, bytes: Array[Byte]) + + val NoIdentifier = Identifier("", new Array[Byte](0)) + + trait Generator { + def generate(): Identifier + def from(string: String): Identifier + def from(bytes: Array[Byte]): Identifier + } + + + class Default extends IdentityProvider { + private val generator = new Generator { + override def generate(): Identifier = { + val data = ByteBuffer.wrap(new Array[Byte](8)) + val random = ThreadLocalRandom.current().nextLong() + data.putLong(random) + + Identifier(HexCodec.toLowerHex(random), data.array()) + } + + override def from(string: String): Identifier = Try { + val identifierLong = HexCodec.lowerHexToUnsignedLong(string) + val data = ByteBuffer.allocate(8) + data.putLong(identifierLong) + + Identifier(string, data.array()) + } getOrElse(IdentityProvider.NoIdentifier) + + override def from(bytes: Array[Byte]): Identifier = Try { + val buffer = ByteBuffer.wrap(bytes) + val identifierLong = buffer.getLong + + Identifier(HexCodec.toLowerHex(identifierLong), bytes) + } getOrElse(IdentityProvider.NoIdentifier) + } + + override def traceIdentifierGenerator(): Generator = generator + override def spanIdentifierGenerator(): Generator = generator + } + + object Default { + def apply(): Default = new Default() + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala index 23eb40db..11d6de2c 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala @@ -13,152 +13,134 @@ * ========================================================================================= */ - package kamon.trace +import java.lang.StringBuilder import java.net.{URLDecoder, URLEncoder} import java.nio.ByteBuffer -import java.util.concurrent.ThreadLocalRandom - import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source} - -import scala.collection.JavaConverters._ -import kamon.util.HexCodec - +import scala.collection.mutable trait SpanContextCodec[T] { - def inject(spanContext: SpanContext, carrier: T): Unit + def inject(spanContext: SpanContext, carrier: T): T + def inject(spanContext: SpanContext): T def extract(carrier: T): Option[SpanContext] } -trait TextMap { - def get(key: String): Option[String] - def put(key: String, value: String): Unit - def values: Iterator[(String, String)] -} - - object SpanContextCodec { - trait Format[C] + sealed trait Format[C] object Format { case object TextMap extends Format[TextMap] case object HttpHeaders extends Format[TextMap] case object Binary extends Format[ByteBuffer] } -// val ExtendedB3: SpanContextCodec[TextMap] = new TextMapSpanCodec( -// traceIDKey = "X-B3-TraceId", -// parentIDKey = "X-B3-ParentSpanId", -// spanIDKey = "X-B3-SpanId", -// sampledKey = "X-B3-Sampled", -// baggageKey = "X-Kamon-Baggage-", -// baggageValueEncoder = urlEncode, -// baggageValueDecoder = urlDecode -// ) - - private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") - private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") - - private class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] { + class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] { import ExtendedB3.Headers + override def inject(spanContext: SpanContext, carrier: TextMap): TextMap = { + if(spanContext != SpanContext.EmptySpanContext) { + carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + carrier.put(Headers.Sampled, encodeSamplingDecision(spanContext.samplingDecision)) + carrier.put(Headers.Baggage, encodeBaggage(spanContext.baggage)) - override def inject(spanContext: SpanContext, carrier: TextMap): Unit = { - carrier.put(Headers.TraceIdentifier, baggageValueEncoder(spanContext.traceID.string)) - carrier.put(parentIDKey, baggageValueEncoder(spanContext.parentID.string)) - carrier.put(spanIDKey, baggageValueEncoder(spanContext.spanID.string)) - - spanContext.baggage.getAll().foreach { - case (key, value) => carrier.put(baggageKey + key, baggageValueEncoder(value)) + spanContext.baggage.get(Headers.Flags).foreach { flags => + carrier.put(Headers.Flags, flags) + } } + + carrier + } + + override def inject(spanContext: SpanContext): TextMap = { + val mutableTextMap = TextMap.Default() + inject(spanContext, mutableTextMap) + mutableTextMap } override def extract(carrier: TextMap): Option[SpanContext] = { val traceID = carrier.get(Headers.TraceIdentifier) - .map(identityProvider.traceIdentifierGenerator().from) + .map(id => identityProvider.traceIdentifierGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) val spanID = carrier.get(Headers.SpanIdentifier) - .map(identityProvider.spanIdentifierGenerator().from) + .map(id => identityProvider.spanIdentifierGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { val parentID = carrier.get(Headers.ParentSpanIdentifier) - .map(identityProvider.spanIdentifierGenerator().from) + .map(id => identityProvider.spanIdentifierGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val samplingDecision = carrier.get(Headers.Flags).orElse(carrier.get(Headers.Sampled)) match { + val baggage = decodeBaggage(carrier.get(Headers.Baggage)) + val flags = carrier.get(Headers.Flags) + + flags.foreach { flags => + baggage.add(Headers.Flags, flags) + } + + val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown } - - - - Some(SpanContext(traceID, spanID, parentID, samplingDecision, ???, Source.Remote)) + Some(SpanContext(traceID, spanID, parentID, samplingDecision, baggage, Source.Remote)) } else None - - val minimalSpanContext = - for { - traceID <- carrier.get(traceIDKey).map(identityProvider.traceIdentifierGenerator().from) - spanID <- carrier.get(spanIDKey).map(identityProvider.spanIdentifierGenerator().from) - } yield { - - - } - - - -// var traceID: String = null -// var parentID: String = null -// var spanID: String = null -// var sampled: String = null -// var baggage: Map[String, String] = Map.empty -// -// carrier.iterator().asScala.foreach { entry => -// if(entry.getKey.equals(traceIDKey)) -// traceID = baggageValueDecoder(entry.getValue) -// else if(entry.getKey.equals(parentIDKey)) -// parentID = baggageValueDecoder(entry.getValue) -// else if(entry.getKey.equals(spanIDKey)) -// spanID = baggageValueDecoder(entry.getValue) -// else if(entry.getKey.equals(sampledKey)) -// sampled = entry.getValue -// else if(entry.getKey.startsWith(baggagePrefix)) -// baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue)) -// } -// -// if(traceID != null && spanID != null) { -// val actualParent = if(parentID == null) 0L else decodeLong(parentID) -// val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1") -// -// new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage) -// } else null -// -// None } private def encodeBaggage(baggage: Baggage): String = { if(baggage.getAll().nonEmpty) { - + val encodedBaggage = new StringBuilder() baggage.getAll().foreach { - case (key, value) => + case (key, value) if key != Headers.Flags => + if(encodedBaggage.length() > 0) + encodedBaggage.append(';') + + encodedBaggage + .append(urlEncode(key)) + .append('=') + .append(urlEncode(value)) } + + encodedBaggage.toString() } else "" } - private def decodeLong(input: String): Long = - HexCodec.lowerHexToUnsignedLong(input) + private def decodeBaggage(encodedBaggage: Option[String]): Baggage = { + val baggage = Baggage() + encodedBaggage.foreach { baggageString => + baggageString.split(";").foreach { group => + val pair = group.split("=") + if(pair.length >= 2 && pair(0).nonEmpty) { + baggage.add(urlDecode(pair(0)), urlDecode(pair(1))) + } + } + } + + baggage + } + + private def encodeSamplingDecision(samplingDecision: SamplingDecision): String = samplingDecision match { + case SamplingDecision.Sample => "1" + case SamplingDecision.DoNotSample => "0" + case SamplingDecision.Unknown => "" + } - private def encodeLong(input: Long): String = - HexCodec.toLowerHex(input) + private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") + private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") } object ExtendedB3 { + + def apply(identityProvider: IdentityProvider): ExtendedB3 = + new ExtendedB3(identityProvider) + object Headers { val TraceIdentifier = "X-B3-TraceId" val ParentSpanIdentifier = "X-B3-ParentSpanId" @@ -169,3 +151,22 @@ object SpanContextCodec { } } } + +trait TextMap { + def get(key: String): Option[String] + def put(key: String, value: String): Unit + def values: Iterator[(String, String)] +} + +object TextMap { + class Default extends TextMap { + private val storage = mutable.Map.empty[String, String] + override def get(key: String): Option[String] = storage.get(key) + override def put(key: String, value: String): Unit = storage.put(key, value) + override def values: Iterator[(String, String)] = storage.toIterator + } + + object Default { + def apply(): Default = new Default() + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 1aec8d7c..08643c63 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -27,22 +27,17 @@ import kamon.util.Clock import org.slf4j.LoggerFactory -trait Tracer { - def buildSpan(operationName: String): SpanBuilder +trait ActiveSpanSource { def activeSpan(): ActiveSpan def makeActive(span: Span): ActiveSpan +} - def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] - def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit - - - - // - // Configuration Utilities - // +trait Tracer extends ActiveSpanSource{ + def buildSpan(operationName: String): SpanBuilder - def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit - def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit + def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] + def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): C + def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C]): C } object Tracer { @@ -57,33 +52,40 @@ object Tracer { private[Tracer] val tracerMetrics = new TracerMetrics(metrics) @volatile private[Tracer] var joinRemoteSpansWithSameID: Boolean = false @volatile private[Tracer] var configuredSampler: Sampler = Sampler.never - @volatile private[Tracer] var idGenerator: IdentifierGenerator = IdentifierGenerator.RandomLong() - @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.TextMap - @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ZipkinB3 + @volatile private[Tracer] var identityProvider: IdentityProvider = IdentityProvider.Default() + @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ExtendedB3(identityProvider) + @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ExtendedB3(identityProvider) reconfigure(initialConfig) - def buildSpan(operationName: String): SpanBuilder = + override def buildSpan(operationName: String): SpanBuilder = new SpanBuilder(operationName, this, reporterRegistry) - def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match { + override def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match { case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) case SpanContextCodec.Format.Binary => None case _ => None } - def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit = format match { + override def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): C = format match { case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) - case SpanContextCodec.Format.Binary => - case _ => + case SpanContextCodec.Format.Binary => carrier + case _ => carrier } - def activeSpan(): ActiveSpan = + override def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C]): C = format match { + case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext) + case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext) + case SpanContextCodec.Format.Binary => ByteBuffer.allocate(0) // TODO: Implement binary encoding. + case _ => sys.error("can't do") + } + + override def activeSpan(): ActiveSpan = activeSpanStorage.get() - def makeActive(span: Span): ActiveSpan = { + override def makeActive(span: Span): ActiveSpan = { val currentlyActiveSpan = activeSpanStorage.get() val newActiveSpan = ActiveSpan.Default(span, currentlyActiveSpan, activeSpanStorage) activeSpanStorage.set(newActiveSpan) @@ -93,13 +95,6 @@ object Tracer { def sampler: Sampler = configuredSampler - def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.textMapSpanContextCodec = codec - - def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.httpHeaderSpanContextCodec = codec - - private[kamon] def reconfigure(config: Config): Unit = synchronized { val traceConfig = config.getConfig("kamon.trace") @@ -110,10 +105,6 @@ object Tracer { case other => sys.error(s"Unexpected sampler name $other.") } } - - private final class TracerMetrics(metricLookup: MetricLookup) { - val createdSpans = metricLookup.counter("tracer.spans-created") - } } final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { @@ -170,13 +161,13 @@ object Tracer { if(parent.source == Source.Remote && tracer.joinRemoteSpansWithSameID) parent.copy(samplingDecision = samplingDecision) else - parent.createChild(tracer.idGenerator.generateSpanID(), samplingDecision) + parent.createChild(tracer.identityProvider.spanIdentifierGenerator().generate(), samplingDecision) private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = SpanContext( - traceID = tracer.idGenerator.generateTraceID(), - spanID = tracer.idGenerator.generateSpanID(), - parentID = tracer.idGenerator.generateEmptyID(), + traceID = tracer.identityProvider.traceIdentifierGenerator().generate(), + spanID = tracer.identityProvider.spanIdentifierGenerator().generate(), + parentID = IdentityProvider.NoIdentifier, samplingDecision = samplingDecision, baggage = SpanContext.Baggage(), source = Source.Local @@ -185,4 +176,8 @@ object Tracer { def startActive(): ActiveSpan = tracer.makeActive(start()) } + + private final class TracerMetrics(metricLookup: MetricLookup) { + val createdSpans = metricLookup.counter("tracer.spans-created") + } } diff --git a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala deleted file mode 100644 index 83027cc5..00000000 --- a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2015 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.util - -import java.util.function.Supplier - -import kamon.trace.{SpanContext => KamonSpanContext} -import kamon.Kamon -import org.slf4j.MDC - -import scala.collection.JavaConverters._ -// -//object BaggageOnMDC { -// -// /** -// * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys -// * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well. -// * -// */ -// def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = { -// val activeSpan = Kamon.activeSpan() -// if(activeSpan == null) -// code -// else { -// val baggageItems = activeSpan.context().baggageItems().asScala -// baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue)) -// if(includeTraceID) -// addTraceIDToMDC(activeSpan.context()) -// -// val evaluatedCode = code -// -// baggageItems.foreach(entry => MDC.remove(entry.getKey)) -// if(includeTraceID) -// removeTraceIDFromMDC() -// -// evaluatedCode -// -// } -// } -// -// def withBaggageOnMDC[T](code: Supplier[T]): T = -// withBaggageOnMDC(true, code.get()) -// -// def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T = -// withBaggageOnMDC(includeTraceID, code.get()) -// -// def withBaggageOnMDC[T](code: => T): T = -// withBaggageOnMDC(true, code) -// -// private val TraceIDKey = "trace_id" -// -// private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match { -// case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID)) -// case _ => -// } -// -// private def removeTraceIDFromMDC(): Unit = -// MDC.remove(TraceIDKey) -//} -- cgit v1.2.3