diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-14 23:28:52 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-14 23:28:52 +0200 |
commit | a949c875684d78818224cd2ca7aaf79aa7878724 (patch) | |
tree | ce84ab802ba3c543b4b107e32b7cac4dea610fc4 /kamon-core/src/main/scala/kamon/trace | |
parent | 18b9fc25d556fef50c5033f8880fab2594783caa (diff) | |
parent | 3144a04ea42a4e333b7a608597c07c0458c9f147 (diff) | |
download | Kamon-a949c875684d78818224cd2ca7aaf79aa7878724.tar.gz Kamon-a949c875684d78818224cd2ca7aaf79aa7878724.tar.bz2 Kamon-a949c875684d78818224cd2ca7aaf79aa7878724.zip |
Merge pull request #1 from ivantopo/wip/context-management-reloaded
remove context management from the Tracer
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
7 files changed, 181 insertions, 422 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/ActiveSpanStorage.scala b/kamon-core/src/main/scala/kamon/trace/ActiveSpanStorage.scala deleted file mode 100644 index 85e94ef2..00000000 --- a/kamon-core/src/main/scala/kamon/trace/ActiveSpanStorage.scala +++ /dev/null @@ -1,74 +0,0 @@ -package kamon.trace - -/** - * A means of storing and retrieving the currently active Span. The application code execution is always considered to - * contribute to the completion of the operation represented by the currently active Span. - * - * The activation of a Span is of temporary nature and users of this API must ensure that all Scopes created via calls - * to `activate(span)` are timely closed; failing to do so might lead to unexpected behavior. Typically, the same block - * of code designating a Span as currently active will close the created Scope after finishing execution. - * - */ -trait ActiveSpanStorage { - - /** - * @return the currently active Span. - */ - def activeSpan(): Span - - /** - * Sets - * @param span the Span to be set as currently active. - * @return a [[Scope]] that will finish the designation of the given Span as active once it's closed. - */ - def activate(span: Span): Scope - -} - -/** - * Encapsulates the state (if any) required to handle the removal of a Span from it's currently active designation. - * - * Typically a Scope will enclose the previously active Span and return the previously active Span when closed, - * although no assumptions are made. - * - */ -trait Scope extends AutoCloseable { - - /** - * Removes the currently active Span from the ActiveSpanStorage. - * - */ - def close(): Unit -} - -object ActiveSpanStorage { - - /** - * A ActiveSpanStorage that uses a [[java.lang.ThreadLocal]] as the underlying storage. - * - */ - final class ThreadLocal extends ActiveSpanStorage { - private val emptySpan = Span.Empty(this) - private val storage: java.lang.ThreadLocal[Span] = new java.lang.ThreadLocal[Span] { - override def initialValue(): Span = emptySpan - } - - override def activeSpan(): Span = - storage.get() - - override def activate(span: Span): Scope = { - val previouslyActiveSpan = storage.get() - storage.set(span) - - new Scope { - override def close(): Unit = { - storage.set(previouslyActiveSpan) - } - } - } - } - - object ThreadLocal { - def apply(): ThreadLocal = new ThreadLocal() - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala index 3f44629e..937200f5 100644 --- a/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala +++ b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala @@ -8,8 +8,8 @@ import kamon.util.HexCodec import scala.util.Try trait IdentityProvider { - def traceIdentifierGenerator(): IdentityProvider.Generator - def spanIdentifierGenerator(): IdentityProvider.Generator + def traceIdGenerator(): IdentityProvider.Generator + def spanIdGenerator(): IdentityProvider.Generator } object IdentityProvider { @@ -57,8 +57,8 @@ object IdentityProvider { } getOrElse(IdentityProvider.NoIdentifier) } - override def traceIdentifierGenerator(): Generator = longGenerator - override def spanIdentifierGenerator(): Generator = longGenerator + override def traceIdGenerator(): Generator = longGenerator + override def spanIdGenerator(): Generator = longGenerator } object Default { @@ -97,7 +97,7 @@ object IdentityProvider { } getOrElse(IdentityProvider.NoIdentifier) } - override def traceIdentifierGenerator(): Generator = doubleLongGenerator + override def traceIdGenerator(): Generator = doubleLongGenerator } object DoubleSizeTraceID { diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 84cc5625..161042d5 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -17,15 +17,18 @@ package kamon package trace import kamon.ReporterRegistry.SpanSink +import kamon.context.Key import kamon.trace.SpanContext.SamplingDecision - import kamon.util.{Clock, MeasurementUnit} -/** - * Minimum set of capabilities that should be provided by a Span, all additional sugar is provided by extensions - * in the Span trait bellow. - */ -trait BaseSpan { + +trait Span { + + def isEmpty(): Boolean + def isLocal(): Boolean + + def nonEmpty(): Boolean = !isEmpty() + def isRemote(): Boolean = !isLocal() def context(): SpanContext @@ -39,21 +42,11 @@ trait BaseSpan { def addMetricTag(key: String, value: String): Span - def addBaggage(key: String, value: String): Span - - def getBaggage(key: String): Option[String] - def setOperationName(name: String): Span def disableMetricsCollection(): Span def finish(finishTimestampMicros: Long): Unit -} - -/** - * - */ -trait Span extends BaseSpan { def finish(): Unit = finish(Clock.microTimestamp()) @@ -71,25 +64,22 @@ trait Span extends BaseSpan { object Span { - final class Empty(activeSpanSource: ActiveSpanStorage) extends Span { - override val context: SpanContext = SpanContext.EmptySpanContext + val ContextKey = Key.broadcast[Span]("span", Span.Empty) + object Empty extends Span { + override val context: SpanContext = SpanContext.EmptySpanContext + override def isEmpty(): Boolean = true + override def isLocal(): Boolean = true override def annotate(annotation: Annotation): Span = this override def addSpanTag(key: String, value: String): Span = this override def addSpanTag(key: String, value: Long): Span = this override def addSpanTag(key: String, value: Boolean): Span = this override def addMetricTag(key: String, value: String): Span = this - override def addBaggage(key: String, value: String): Span = this - override def getBaggage(key: String): Option[String] = None override def setOperationName(name: String): Span = this override def disableMetricsCollection(): Span = this override def finish(finishTimestampMicros: Long): Unit = {} } - object Empty { - def apply(activeSpanSource: ActiveSpanStorage): Empty = new Empty(activeSpanSource) - } - /** * * @param spanContext @@ -98,8 +88,8 @@ object Span { * @param startTimestampMicros * @param spanSink */ - final class Real(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, activeSpanSource: ActiveSpanStorage) extends Span { + final class Local(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink) extends Span { private var collectMetrics: Boolean = true private var open: Boolean = true @@ -110,6 +100,9 @@ object Span { private var customMetricTags = initialMetricTags private var annotations = List.empty[Span.Annotation] + override def isEmpty(): Boolean = false + override def isLocal(): Boolean = true + def annotate(annotation: Annotation): Span = synchronized { if(sampled && open) annotations = annotation :: annotations @@ -142,14 +135,6 @@ object Span { this } - override def addBaggage(key: String, value: String): Span = { - spanContext.baggage.add(key, value) - this - } - - override def getBaggage(key: String): Option[String] = - spanContext.baggage.get(key) - override def disableMetricsCollection(): Span = synchronized { collectMetrics = false this @@ -194,10 +179,29 @@ object Span { } } - object Real { + object Local { def apply(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer): Real = - new Real(spanContext, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, tracer) + initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl): Local = + new Local(spanContext, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry) + } + + + final class Remote(val context: SpanContext) extends Span { + override def isEmpty(): Boolean = false + override def isLocal(): Boolean = false + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addSpanTag(key: String, value: Long): Span = this + override def addSpanTag(key: String, value: Boolean): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(finishTimestampMicros: Long): Unit = {} + } + + object Remote { + def apply(spanContext: SpanContext): Remote = + new Remote(spanContext) } sealed trait TagValue diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala new file mode 100644 index 00000000..e04ceb03 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -0,0 +1,99 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.{URLDecoder, URLEncoder} + +import kamon.Kamon +import kamon.context.{Codec, Context, TextMap} +import kamon.trace.SpanContext.SamplingDecision + + +object SpanCodec { + + class B3 extends Codec.ForEntry[TextMap] { + import B3.Headers + + override def encode(context: Context): TextMap = { + val span = context.get(Span.ContextKey) + val carrier = TextMap.Default() + + if(span.nonEmpty()) { + val spanContext = span.context + carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + + encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => + carrier.put(Headers.Sampled, samplingDecision) + } + } + + carrier + } + + override def decode(carrier: TextMap, context: Context): Context = { + val identityProvider = Kamon.tracer.identityProvider + val traceID = carrier.get(Headers.TraceIdentifier) + .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val spanID = carrier.get(Headers.SpanIdentifier) + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { + val parentID = carrier.get(Headers.ParentSpanIdentifier) + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val flags = carrier.get(Headers.Flags) + + val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { + case Some(sampled) if sampled == "1" => SamplingDecision.Sample + case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + + context.withKey(Span.ContextKey, Span.Remote(SpanContext(traceID, spanID, parentID, samplingDecision))) + + } else context + } + + private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { + case SamplingDecision.Sample => Some("1") + case SamplingDecision.DoNotSample => Some("0") + case SamplingDecision.Unknown => None + } + + private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") + private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") + } + + object B3 { + + def apply(): B3 = + new B3() + + object Headers { + val TraceIdentifier = "X-B3-TraceId" + val ParentSpanIdentifier = "X-B3-ParentSpanId" + val SpanIdentifier = "X-B3-SpanId" + val Sampled = "X-B3-Sampled" + val Flags = "X-B3-Flags" + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala index ae92f46d..4d013881 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala @@ -16,7 +16,7 @@ package kamon.trace import kamon.trace.IdentityProvider.Identifier -import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source} +import kamon.trace.SpanContext.SamplingDecision /** * @@ -24,9 +24,8 @@ import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source} * @param spanID * @param parentID * @param samplingDecision - * @param baggage */ -case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identifier, samplingDecision: SamplingDecision, baggage: Baggage, source: Source) { +case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identifier, samplingDecision: SamplingDecision) { def createChild(childSpanID: Identifier, samplingDecision: SamplingDecision): SpanContext = this.copy(parentID = this.spanID, spanID = childSpanID) @@ -34,73 +33,33 @@ case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identi object SpanContext { - sealed trait Source - object Source { - case object Local extends Source - case object Remote extends Source - } - val EmptySpanContext = SpanContext( - traceID = IdentityProvider.NoIdentifier, - spanID = IdentityProvider.NoIdentifier, - parentID = IdentityProvider.NoIdentifier, - samplingDecision = SamplingDecision.DoNotSample, - baggage = Baggage.EmptyBaggage, - source = Source.Local + traceID = IdentityProvider.NoIdentifier, + spanID = IdentityProvider.NoIdentifier, + parentID = IdentityProvider.NoIdentifier, + samplingDecision = SamplingDecision.DoNotSample ) sealed trait SamplingDecision + object SamplingDecision { /** - * The Trace is sampled, all child Spans should be sampled as well. + * The Trace is sampled, all child Spans should be sampled as well. */ case object Sample extends SamplingDecision /** - * The Trace is not sampled, none of the child Spans should be sampled. + * The Trace is not sampled, none of the child Spans should be sampled. */ case object DoNotSample extends SamplingDecision /** - * The sampling decision has not been taken yet, the Tracer is free to decide when creating a Span. + * The sampling decision has not been taken yet, the Tracer is free to decide when creating a Span. */ case object Unknown extends SamplingDecision - } - /** - * - */ - - sealed trait Baggage { - def add(key: String, value:String): Unit - def get(key: String): Option[String] - def getAll(): Map[String, String] } - object Baggage { - def apply(): Baggage = new DefaultBaggage() - - case object EmptyBaggage extends Baggage { - override def add(key: String, value: String): Unit = {} - override def get(key: String): Option[String] = None - override def getAll: Map[String, String] = Map.empty - } - - - final class DefaultBaggage extends Baggage { - private var baggage: Map[String, String] = Map.empty - - def add(key: String, value: String): Unit = synchronized { - baggage = baggage + (key -> value) - } - - def get(key: String): Option[String] = - baggage.get(key) - - def getAll: Map[String, String] = - baggage - } - } -} +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala deleted file mode 100644 index 43b5e8e4..00000000 --- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.lang.StringBuilder -import java.net.{URLDecoder, URLEncoder} -import java.nio.ByteBuffer -import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source} -import scala.collection.mutable - -trait SpanContextCodec[T] { - def inject(spanContext: SpanContext, carrier: T): T - def inject(spanContext: SpanContext): T - def extract(carrier: T): Option[SpanContext] -} - -object SpanContextCodec { - - sealed trait Format[C] - object Format { - case object TextMap extends Format[TextMap] - case object HttpHeaders extends Format[TextMap] - case object Binary extends Format[ByteBuffer] - } - - class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] { - import ExtendedB3.Headers - - override def inject(spanContext: SpanContext, carrier: TextMap): TextMap = { - if(spanContext != SpanContext.EmptySpanContext) { - carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) - carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) - carrier.put(Headers.Baggage, encodeBaggage(spanContext.baggage)) - - encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - carrier.put(Headers.Sampled, samplingDecision) - } - - spanContext.baggage.get(Headers.Flags).foreach { flags => - carrier.put(Headers.Flags, flags) - } - } - - carrier - } - - override def inject(spanContext: SpanContext): TextMap = - inject(spanContext, TextMap.Default()) - - override def extract(carrier: TextMap): Option[SpanContext] = { - val traceID = carrier.get(Headers.TraceIdentifier) - .map(id => identityProvider.traceIdentifierGenerator().from(urlDecode(id))) - .getOrElse(IdentityProvider.NoIdentifier) - - val spanID = carrier.get(Headers.SpanIdentifier) - .map(id => identityProvider.spanIdentifierGenerator().from(urlDecode(id))) - .getOrElse(IdentityProvider.NoIdentifier) - - if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = carrier.get(Headers.ParentSpanIdentifier) - .map(id => identityProvider.spanIdentifierGenerator().from(urlDecode(id))) - .getOrElse(IdentityProvider.NoIdentifier) - - val baggage = decodeBaggage(carrier.get(Headers.Baggage)) - val flags = carrier.get(Headers.Flags) - - flags.foreach { flags => - baggage.add(Headers.Flags, flags) - } - - val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { - case Some(sampled) if sampled == "1" => SamplingDecision.Sample - case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample - case _ => SamplingDecision.Unknown - } - - Some(SpanContext(traceID, spanID, parentID, samplingDecision, baggage, Source.Remote)) - - } else None - } - - private def encodeBaggage(baggage: Baggage): String = { - if(baggage.getAll().nonEmpty) { - val encodedBaggage = new StringBuilder() - baggage.getAll().foreach { - case (key, value) => - if(key != Headers.Flags) { - if (encodedBaggage.length() > 0) - encodedBaggage.append(';') - - encodedBaggage - .append(urlEncode(key)) - .append('=') - .append(urlEncode(value)) - } - } - - encodedBaggage.toString() - } else "" - } - - private def decodeBaggage(encodedBaggage: Option[String]): Baggage = { - val baggage = Baggage() - encodedBaggage.foreach { baggageString => - baggageString.split(";").foreach { group => - val pair = group.split("=") - if(pair.length >= 2 && pair(0).nonEmpty) { - baggage.add(urlDecode(pair(0)), urlDecode(pair(1))) - } - } - } - - baggage - } - - private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { - case SamplingDecision.Sample => Some("1") - case SamplingDecision.DoNotSample => Some("0") - case SamplingDecision.Unknown => None - } - - private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") - private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") - - } - - object ExtendedB3 { - - def apply(identityProvider: IdentityProvider): ExtendedB3 = - new ExtendedB3(identityProvider) - - object Headers { - val TraceIdentifier = "X-B3-TraceId" - val ParentSpanIdentifier = "X-B3-ParentSpanId" - val SpanIdentifier = "X-B3-SpanId" - val Sampled = "X-B3-Sampled" - val Flags = "X-B3-Flags" - val Baggage = "X-B3-Extra-Baggage" - } - } -} - -trait TextMap { - def get(key: String): Option[String] - def put(key: String, value: String): Unit - def values: Iterator[(String, String)] -} - -object TextMap { - class Default extends TextMap { - private val storage = mutable.Map.empty[String, String] - override def get(key: String): Option[String] = storage.get(key) - override def put(key: String, value: String): Unit = storage.put(key, value) - override def values: Iterator[(String, String)] = storage.toIterator - } - - object Default { - def apply(): Default = new Default() - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index bfdd561d..7d8830ca 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -15,13 +15,11 @@ package kamon.trace -import java.nio.ByteBuffer - import com.typesafe.config.Config -import kamon.ReporterRegistryImpl +import kamon.{Kamon, ReporterRegistryImpl} import kamon.metric.MetricLookup import kamon.trace.Span.TagValue -import kamon.trace.SpanContext.{SamplingDecision, Source} +import kamon.trace.SpanContext.SamplingDecision import kamon.trace.Tracer.SpanBuilder import kamon.util.{Clock, DynamicAccess} import org.slf4j.LoggerFactory @@ -29,55 +27,28 @@ import org.slf4j.LoggerFactory import scala.collection.immutable import scala.util.Try -trait Tracer extends ActiveSpanStorage { +trait Tracer { def buildSpan(operationName: String): SpanBuilder - - def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] - def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): C - def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C]): C + def identityProvider: IdentityProvider } object Tracer { final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { private val logger = LoggerFactory.getLogger(classOf[Tracer]) - private val activeSpanSource = ActiveSpanStorage.ThreadLocal() private[Tracer] val tracerMetrics = new TracerMetrics(metrics) @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true @volatile private[Tracer] var configuredSampler: Sampler = Sampler.Never - @volatile private[Tracer] var identityProvider: IdentityProvider = IdentityProvider.Default() - @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ExtendedB3(identityProvider) - @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ExtendedB3(identityProvider) + @volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default() reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = new SpanBuilder(operationName, this, reporterRegistry) - override def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match { - case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) - case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) - case SpanContextCodec.Format.Binary => None - } - - override def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): C = format match { - case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) - case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) - case SpanContextCodec.Format.Binary => carrier - } - - override def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C]): C = format match { - case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext) - case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext) - case SpanContextCodec.Format.Binary => ByteBuffer.allocate(0) // TODO: Implement binary encoding. - } - - override def activeSpan(): Span = - activeSpanSource.activeSpan() - - override def activate(span: Span): Scope = - activeSpanSource.activate(span) + override def identityProvider: IdentityProvider = + this._identityProvider def sampler: Sampler = configuredSampler @@ -100,25 +71,9 @@ object Tracer { traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)] ).get - val spanContextCodecs = traceConfig.getConfig("span-context-codec") - val newTextMapSpanContextCodec = dynamic.createInstanceFor[SpanContextCodec[TextMap]]( - spanContextCodecs.getString("text-map"), immutable.Seq((classOf[IdentityProvider], newIdentityProvider)) - ).get - - val newHttpHeadersSpanContextCodec = dynamic.createInstanceFor[SpanContextCodec[TextMap]]( - spanContextCodecs.getString("http-headers"), immutable.Seq((classOf[IdentityProvider], newIdentityProvider)) - ).get - -// val newBinarySpanContextCodec = dynamic.createInstanceFor[SpanContextCodec[TextMap]]( -// spanContextCodecs.getString("binary"), immutable.Seq((classOf[IdentityProvider], newIdentityProvider)) -// ).get // TODO: Make it happen! - - configuredSampler = newSampler joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID - identityProvider = newIdentityProvider - textMapSpanContextCodec = newTextMapSpanContextCodec - httpHeaderSpanContextCodec = newHttpHeadersSpanContextCodec + _identityProvider = newIdentityProvider }.failed.foreach { ex => logger.error("Unable to reconfigure Kamon Tracer", ex) @@ -132,25 +87,17 @@ object Tracer { } final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { - private var parentContext: SpanContext = _ + private var parentSpan: Span = _ private var startTimestamp = 0L private var initialSpanTags = Map.empty[String, Span.TagValue] private var initialMetricTags = Map.empty[String, String] private var useActiveSpanAsParent = true - def asChildOf(parentContext: SpanContext): SpanBuilder = { - this.parentContext = parentContext + def asChildOf(parent: Span): SpanBuilder = { + if(parent != Span.Empty) this.parentSpan = parent this } - def asChildOf(parentContext: Option[SpanContext]): SpanBuilder = { - parentContext.foreach(asChildOf) - this - } - - def asChildOf(parentSpan: Span): SpanBuilder = - asChildOf(parentSpan.context()) - def withMetricTag(key: String, value: String): SpanBuilder = { this.initialMetricTags = this.initialMetricTags + (key -> value) this @@ -185,38 +132,36 @@ object Tracer { def start(): Span = { val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() - val parentSpanContext: Option[SpanContext] = Option(parentContext) - .orElse(if(useActiveSpanAsParent) Some(tracer.activeSpan().context()) else None) - .filter(spanContext => spanContext != SpanContext.EmptySpanContext) + val parentSpan: Option[Span] = Option(this.parentSpan) + .orElse(if(useActiveSpanAsParent) Some(Kamon.currentContext().get(Span.ContextKey)) else None) + .filter(span => span != Span.Empty) - val samplingDecision: SamplingDecision = parentSpanContext - .map(_.samplingDecision) + val samplingDecision: SamplingDecision = parentSpan + .map(_.context.samplingDecision) .filter(_ != SamplingDecision.Unknown) .getOrElse(tracer.sampler.decide(operationName, initialSpanTags)) - val spanContext = parentSpanContext match { + val spanContext = parentSpan match { case Some(parent) => joinParentContext(parent, samplingDecision) case None => newSpanContext(samplingDecision) } tracer.tracerMetrics.createdSpans.increment() - Span.Real(spanContext, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, tracer) + Span.Local(spanContext, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry) } - private def joinParentContext(parent: SpanContext, samplingDecision: SamplingDecision): SpanContext = - if(parent.source == Source.Remote && tracer.joinRemoteParentsWithSameSpanID) - parent.copy(samplingDecision = samplingDecision) + private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = + if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID) + parent.context().copy(samplingDecision = samplingDecision) else - parent.createChild(tracer.identityProvider.spanIdentifierGenerator().generate(), samplingDecision) + parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = SpanContext( - traceID = tracer.identityProvider.traceIdentifierGenerator().generate(), - spanID = tracer.identityProvider.spanIdentifierGenerator().generate(), + traceID = tracer._identityProvider.traceIdGenerator().generate(), + spanID = tracer._identityProvider.spanIdGenerator().generate(), parentID = IdentityProvider.NoIdentifier, - samplingDecision = samplingDecision, - baggage = SpanContext.Baggage(), - source = Source.Local + samplingDecision = samplingDecision ) } |