diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
7 files changed, 535 insertions, 353 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala new file mode 100644 index 00000000..937200f5 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala @@ -0,0 +1,106 @@ +package kamon.trace + +import java.nio.ByteBuffer +import java.util.concurrent.ThreadLocalRandom + +import kamon.util.HexCodec + +import scala.util.Try + +trait IdentityProvider { + def traceIdGenerator(): IdentityProvider.Generator + def spanIdGenerator(): IdentityProvider.Generator +} + +object IdentityProvider { + case class Identifier(string: String, bytes: Array[Byte]) { + + override def equals(obj: Any): Boolean = { + if(obj != null && obj.isInstanceOf[Identifier]) + obj.asInstanceOf[Identifier].string == string + else false + } + } + + val NoIdentifier = Identifier("", new Array[Byte](0)) + + trait Generator { + def generate(): Identifier + def from(string: String): Identifier + def from(bytes: Array[Byte]): Identifier + } + + + class Default extends IdentityProvider { + protected val longGenerator = new Generator { + override def generate(): Identifier = { + val data = ByteBuffer.wrap(new Array[Byte](8)) + val random = ThreadLocalRandom.current().nextLong() + data.putLong(random) + + Identifier(HexCodec.toLowerHex(random), data.array()) + } + + override def from(string: String): Identifier = Try { + val identifierLong = HexCodec.lowerHexToUnsignedLong(string) + val data = ByteBuffer.allocate(8) + data.putLong(identifierLong) + + Identifier(string, data.array()) + } getOrElse(IdentityProvider.NoIdentifier) + + override def from(bytes: Array[Byte]): Identifier = Try { + val buffer = ByteBuffer.wrap(bytes) + val identifierLong = buffer.getLong + + Identifier(HexCodec.toLowerHex(identifierLong), bytes) + } getOrElse(IdentityProvider.NoIdentifier) + } + + override def traceIdGenerator(): Generator = longGenerator + override def spanIdGenerator(): Generator = longGenerator + } + + object Default { + def apply(): Default = new Default() + } + + + class DoubleSizeTraceID extends Default { + private val doubleLongGenerator = new Generator { + override def generate(): Identifier = { + val data = ByteBuffer.wrap(new Array[Byte](16)) + val highLong = ThreadLocalRandom.current().nextLong() + val lowLong = ThreadLocalRandom.current().nextLong() + data.putLong(highLong) + data.putLong(lowLong) + + Identifier(HexCodec.toLowerHex(highLong) + HexCodec.toLowerHex(lowLong), data.array()) + } + + override def from(string: String): Identifier = Try { + val highPart = HexCodec.lowerHexToUnsignedLong(string.substring(0, 16)) + val lowPart = HexCodec.lowerHexToUnsignedLong(string.substring(16, 32)) + val data = ByteBuffer.allocate(16) + data.putLong(highPart) + data.putLong(lowPart) + + Identifier(string, data.array()) + } getOrElse(IdentityProvider.NoIdentifier) + + override def from(bytes: Array[Byte]): Identifier = Try { + val buffer = ByteBuffer.wrap(bytes) + val highLong = buffer.getLong + val lowLong = buffer.getLong + + Identifier(HexCodec.toLowerHex(highLong) + HexCodec.toLowerHex(lowLong), bytes) + } getOrElse(IdentityProvider.NoIdentifier) + } + + override def traceIdGenerator(): Generator = doubleLongGenerator + } + + object DoubleSizeTraceID { + def apply(): DoubleSizeTraceID = new DoubleSizeTraceID() + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala index 0347a151..3f366175 100644 --- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -15,39 +15,44 @@ package kamon.trace +import java.util.concurrent.ThreadLocalRandom +import kamon.trace.SpanContext.SamplingDecision + trait Sampler { - def decide(spanID: Long): Boolean + def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision } object Sampler { - val always = new Constant(true) - val never = new Constant(false) + val Always = new Constant(SamplingDecision.Sample) + val Never = new Constant(SamplingDecision.DoNotSample) - def random(chance: Double): Sampler = { - assert(chance >= 0D && chance <= 1.0D, "Change should be >= 0 and <= 1.0") + def random(probability: Double): Sampler = { + assert(probability >= 0D && probability <= 1.0D, "The probability should be >= 0 and <= 1.0") - chance match { - case 0D => never - case 1.0D => always + probability match { + case 0D => Never + case 1.0D => Always case anyOther => new Random(anyOther) } } - class Constant(decision: Boolean) extends Sampler { - override def decide(spanID: Long): Boolean = decision + class Constant(decision: SamplingDecision) extends Sampler { + override def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision = decision override def toString: String = s"Sampler.Constant(decision = $decision)" } - class Random(chance: Double) extends Sampler { - val upperBoundary = Long.MaxValue * chance + class Random(probability: Double) extends Sampler { + val upperBoundary = Long.MaxValue * probability val lowerBoundary = -upperBoundary - override def decide(spanID: Long): Boolean = - spanID >= lowerBoundary && spanID <= upperBoundary + override def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision = { + val random = ThreadLocalRandom.current().nextLong() + if(random >= lowerBoundary && random <= upperBoundary) SamplingDecision.Sample else SamplingDecision.DoNotSample + } override def toString: String = - s"Sampler.Random(chance = $chance)" + s"Sampler.Random(probability = $probability)" } } diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 464559e3..a4424a45 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -16,176 +16,220 @@ package kamon package trace - -import scala.collection.JavaConverters._ +import kamon.ReporterRegistry.SpanSink +import kamon.context.Key +import kamon.trace.SpanContext.SamplingDecision import kamon.util.{Clock, MeasurementUnit} -class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long, - reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { - private var isOpen: Boolean = true - private val sampled: Boolean = spanContext.sampled - private var operationName: String = initialOperationName - private var endTimestampMicros: Long = 0 +trait Span { - private var tags = initialTags - private var logs = List.empty[Span.LogEntry] - private var additionalMetricTags = Map.empty[String, String] + def isEmpty(): Boolean + def isLocal(): Boolean + def nonEmpty(): Boolean = !isEmpty() + def isRemote(): Boolean = !isLocal() - override def log(fields: java.util.Map[String, _]): Span = - log(fields.asScala.asInstanceOf[Map[String, _]]) + def context(): SpanContext - def log(fields: Map[String, _]): Span = synchronized { - if (sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs - this - } + def annotate(annotation: Span.Annotation): Span - def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, fields) :: logs - this - } + def addSpanTag(key: String, value: String): Span - override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = - log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + def addSpanTag(key: String, value: Long): Span - override def log(event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs - this - } + def addSpanTag(key: String, value: Boolean): Span - override def log(timestampMicroseconds: Long, event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs - this - } + def addMetricTag(key: String, value: String): Span - override def log(eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs - this - } + def setOperationName(name: String): Span + + def disableMetricsCollection(): Span + + def finish(finishTimestampMicros: Long): Unit + + def finish(): Unit = + finish(Clock.microTimestamp()) + + def annotate(name: String): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty)) - override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs - this + def annotate(name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, fields)) + + def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(timestampMicroseconds, name, fields)) + +} + +object Span { + + val ContextKey = Key.broadcast[Span]("span", Span.Empty) + + object Empty extends Span { + override val context: SpanContext = SpanContext.EmptySpanContext + override def isEmpty(): Boolean = true + override def isLocal(): Boolean = true + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addSpanTag(key: String, value: Long): Span = this + override def addSpanTag(key: String, value: Boolean): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(finishTimestampMicros: Long): Unit = {} } - override def getBaggageItem(key: String): String = - spanContext.getBaggage(key) + /** + * + * @param spanContext + * @param initialOperationName + * @param initialSpanTags + * @param startTimestampMicros + * @param spanSink + */ + final class Local(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink) extends Span { + + private var collectMetrics: Boolean = true + private var open: Boolean = true + private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample + private var operationName: String = initialOperationName + + private var spanTags: Map[String, Span.TagValue] = initialSpanTags + private var customMetricTags = initialMetricTags + private var annotations = List.empty[Span.Annotation] + + override def isEmpty(): Boolean = false + override def isLocal(): Boolean = true + + def annotate(annotation: Annotation): Span = synchronized { + if(sampled && open) + annotations = annotation :: annotations + this + } - override def context(): SpanContext = - spanContext + override def addSpanTag(key: String, value: String): Span = synchronized { + if(sampled && open) + spanTags = spanTags + (key -> TagValue.String(value)) + this + } - override def setTag(key: String, value: String): Span = synchronized { - if (isOpen) { - extractMetricTag(key, value) - if(sampled) - tags = tags ++ Map(key -> value) + override def addSpanTag(key: String, value: Long): Span = synchronized { + if(sampled && open) + spanTags = spanTags + (key -> TagValue.Number(value)) + this } - this - } - override def setTag(key: String, value: Boolean): Span = { - if (isOpen) { - val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addSpanTag(key: String, value: Boolean): Span = synchronized { + if(sampled && open) { + val tagValue = if (value) TagValue.True else TagValue.False + spanTags = spanTags + (key -> tagValue) + } + this } - this - } - override def setTag(key: String, value: Number): Span = { - if (isOpen) { - val tagValue = String.valueOf(value) - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addMetricTag(key: String, value: String): Span = synchronized { + if(sampled && open && collectMetrics) + customMetricTags = customMetricTags + (key -> value) + this } - this - } - def setMetricTag(key: String, value: String): Span = synchronized { - if (isOpen) - additionalMetricTags = additionalMetricTags ++ Map(key -> value) - this - } + override def disableMetricsCollection(): Span = synchronized { + collectMetrics = false + this + } - override def setBaggageItem(key: String, value: String): Span = synchronized { - if (isOpen) - spanContext.addBaggageItem(key, value) - this - } + override def context(): SpanContext = + spanContext - override def setOperationName(operationName: String): Span = synchronized { - if(isOpen) - this.operationName = operationName - this - } + override def setOperationName(operationName: String): Span = synchronized { + if(open) + this.operationName = operationName + this + } - private def extractMetricTag(tag: String, value: String): Unit = - if(tag.startsWith(Span.MetricTagPrefix)) - additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) + override def finish(finishMicros: Long): Unit = synchronized { + if (open) { + open = false - override def finish(): Unit = - finish(Clock.microTimestamp()) + if(collectMetrics) + recordSpanMetrics(finishMicros) + + if(sampled) + spanSink.reportSpan(toFinishedSpan(finishMicros)) + } + } + + private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan = + Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations) + + private def recordSpanMetrics(endTimestampMicros: Long): Unit = { + val elapsedTime = endTimestampMicros - startTimestampMicros + val metricTags = Map("operation" -> operationName) ++ customMetricTags - override def finish(finishMicros: Long): Unit = synchronized { - if (isOpen) { - isOpen = false - endTimestampMicros = finishMicros - recordSpanMetrics() + val isError = spanTags.get("error").exists { + errorTag => errorTag != null && errorTag.equals(Span.TagValue.True) + } - if(sampled) - reporterRegistry.reportSpan(completedSpan) + val refinedMetricTags = if(isError) + metricTags + ("error" -> "true") + else + metricTags + + val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedMetricTags) + latencyHistogram.record(elapsedTime) } } - private def completedSpan: Span.CompletedSpan = - Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs) + object Local { + def apply(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], + initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl): Local = + new Local(spanContext, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry) + } - private def recordSpanMetrics(): Unit = { - val elapsedTime = endTimestampMicros - startTimestampMicros - val metricTags = Map("operation" -> operationName) ++ additionalMetricTags - val isError = tags.get("error").exists { - errorTag => errorTag != null && errorTag.equals(Span.BooleanTagTrueValue) - } + final class Remote(val context: SpanContext) extends Span { + override def isEmpty(): Boolean = false + override def isLocal(): Boolean = false + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addSpanTag(key: String, value: Long): Span = this + override def addSpanTag(key: String, value: Boolean): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(finishTimestampMicros: Long): Unit = {} + } - val refinedTags = if(isError) { - metricTags + ("error" -> Span.BooleanTagTrueValue) - } else { - metricTags - } + object Remote { + def apply(spanContext: SpanContext): Remote = + new Remote(spanContext) + } + + sealed trait TagValue + object TagValue { + sealed trait Boolean extends TagValue + case object True extends Boolean + case object False extends Boolean - val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedTags) - latencyHistogram.record(elapsedTime) + case class String(string: java.lang.String) extends TagValue + case class Number(number: Long) extends TagValue } -} -object Span { object Metrics { val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds) val SpanErrorCount = Kamon.counter("span.error-count") } - val MetricTagPrefix = "metric." - val BooleanTagTrueValue = "1" - val BooleanTagFalseValue = "0" - - case class LogEntry(timestamp: Long, fields: Map[String, _]) + case class Annotation(timestampMicros: Long, name: String, fields: Map[String, String]) - case class CompletedSpan( + case class FinishedSpan( context: SpanContext, operationName: String, startTimestampMicros: Long, endTimestampMicros: Long, - tags: Map[String, String], - logs: Seq[LogEntry] + tags: Map[String, Span.TagValue], + annotations: Seq[Annotation] ) }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala new file mode 100644 index 00000000..e04ceb03 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -0,0 +1,99 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.{URLDecoder, URLEncoder} + +import kamon.Kamon +import kamon.context.{Codec, Context, TextMap} +import kamon.trace.SpanContext.SamplingDecision + + +object SpanCodec { + + class B3 extends Codec.ForEntry[TextMap] { + import B3.Headers + + override def encode(context: Context): TextMap = { + val span = context.get(Span.ContextKey) + val carrier = TextMap.Default() + + if(span.nonEmpty()) { + val spanContext = span.context + carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + + encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => + carrier.put(Headers.Sampled, samplingDecision) + } + } + + carrier + } + + override def decode(carrier: TextMap, context: Context): Context = { + val identityProvider = Kamon.tracer.identityProvider + val traceID = carrier.get(Headers.TraceIdentifier) + .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val spanID = carrier.get(Headers.SpanIdentifier) + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { + val parentID = carrier.get(Headers.ParentSpanIdentifier) + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val flags = carrier.get(Headers.Flags) + + val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { + case Some(sampled) if sampled == "1" => SamplingDecision.Sample + case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + + context.withKey(Span.ContextKey, Span.Remote(SpanContext(traceID, spanID, parentID, samplingDecision))) + + } else context + } + + private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { + case SamplingDecision.Sample => Some("1") + case SamplingDecision.DoNotSample => Some("0") + case SamplingDecision.Unknown => None + } + + private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") + private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") + } + + object B3 { + + def apply(): B3 = + new B3() + + object Headers { + val TraceIdentifier = "X-B3-TraceId" + val ParentSpanIdentifier = "X-B3-ParentSpanId" + val SpanIdentifier = "X-B3-SpanId" + val Sampled = "X-B3-Sampled" + val Flags = "X-B3-Flags" + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala index b37e208b..4d013881 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala @@ -15,26 +15,51 @@ package kamon.trace -import java.lang -import java.util.{Map => JavaMap} +import kamon.trace.IdentityProvider.Identifier +import kamon.trace.SpanContext.SamplingDecision -import scala.collection.JavaConverters._ +/** + * + * @param traceID + * @param spanID + * @param parentID + * @param samplingDecision + */ +case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identifier, samplingDecision: SamplingDecision) { -class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long, val sampled: Boolean, - private var baggage: Map[String, String]) extends io.opentracing.SpanContext { + def createChild(childSpanID: Identifier, samplingDecision: SamplingDecision): SpanContext = + this.copy(parentID = this.spanID, spanID = childSpanID) +} - private[kamon] def addBaggageItem(key: String, value: String): Unit = synchronized { - baggage = baggage + (key -> value) - } +object SpanContext { - private[kamon] def getBaggage(key: String): String = synchronized { - baggage.get(key).getOrElse(null) - } + val EmptySpanContext = SpanContext( + traceID = IdentityProvider.NoIdentifier, + spanID = IdentityProvider.NoIdentifier, + parentID = IdentityProvider.NoIdentifier, + samplingDecision = SamplingDecision.DoNotSample + ) + + + sealed trait SamplingDecision - private[kamon] def baggageMap: Map[String, String] = - baggage + object SamplingDecision { + + /** + * The Trace is sampled, all child Spans should be sampled as well. + */ + case object Sample extends SamplingDecision + + /** + * The Trace is not sampled, none of the child Spans should be sampled. + */ + case object DoNotSample extends SamplingDecision + + /** + * The sampling decision has not been taken yet, the Tracer is free to decide when creating a Span. + */ + case object Unknown extends SamplingDecision - override def baggageItems(): lang.Iterable[JavaMap.Entry[String, String]] = synchronized { - baggage.asJava.entrySet() } -} + +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala deleted file mode 100644 index 8e3a446b..00000000 --- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - - -package kamon.trace - -import java.net.{URLDecoder, URLEncoder} -import java.util.concurrent.ThreadLocalRandom - -import scala.collection.JavaConverters._ -import io.opentracing.propagation.TextMap -import kamon.util.HexCodec - - -trait SpanContextCodec[T] { - def inject(spanContext: SpanContext, carrier: T): Unit - def extract(carrier: T, sampler: Sampler): SpanContext -} - -object SpanContextCodec { - - val TextMap: SpanContextCodec[TextMap] = new TextMapSpanCodec( - traceIDKey = "TRACE_ID", - parentIDKey = "PARENT_ID", - spanIDKey = "SPAN_ID", - sampledKey = "SAMPLED", - baggagePrefix = "BAGGAGE_", - baggageValueEncoder = identity, - baggageValueDecoder = identity - ) - - val ZipkinB3: SpanContextCodec[TextMap] = new TextMapSpanCodec( - traceIDKey = "X-B3-TraceId", - parentIDKey = "X-B3-ParentSpanId", - spanIDKey = "X-B3-SpanId", - sampledKey = "X-B3-Sampled", - baggagePrefix = "X-B3-Baggage-", - baggageValueEncoder = urlEncode, - baggageValueDecoder = urlDecode - ) - - private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") - private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") - - private class TextMapSpanCodec(traceIDKey: String, parentIDKey: String, spanIDKey: String, sampledKey: String, baggagePrefix: String, - baggageValueEncoder: String => String, baggageValueDecoder: String => String) extends SpanContextCodec[TextMap] { - - override def inject(spanContext: SpanContext, carrier: TextMap): Unit = { - carrier.put(traceIDKey, encodeLong(spanContext.traceID)) - carrier.put(parentIDKey, encodeLong(spanContext.parentID)) - carrier.put(spanIDKey, encodeLong(spanContext.spanID)) - - spanContext.baggageItems().iterator().asScala.foreach { entry => - carrier.put(baggagePrefix + entry.getKey, baggageValueEncoder(entry.getValue)) - } - } - - override def extract(carrier: TextMap, sampler: Sampler): SpanContext = { - var traceID: String = null - var parentID: String = null - var spanID: String = null - var sampled: String = null - var baggage: Map[String, String] = Map.empty - - carrier.iterator().asScala.foreach { entry => - if(entry.getKey.equals(traceIDKey)) - traceID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(parentIDKey)) - parentID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(spanIDKey)) - spanID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(sampledKey)) - sampled = entry.getValue - else if(entry.getKey.startsWith(baggagePrefix)) - baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue)) - } - - if(traceID != null && spanID != null) { - val actualParent = if(parentID == null) 0L else decodeLong(parentID) - val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1") - - new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage) - } else null - } - - private def decodeLong(input: String): Long = - HexCodec.lowerHexToUnsignedLong(input) - - private def encodeLong(input: Long): String = - HexCodec.toLowerHex(input) - - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 19067f5e..7d8830ca 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -13,148 +13,156 @@ * ========================================================================================= */ - package kamon.trace -import java.util.concurrent.ThreadLocalRandom - import com.typesafe.config.Config -import io.opentracing.propagation.{Format, TextMap} -import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP} -import io.opentracing.util.ThreadLocalActiveSpanSource -import kamon.ReporterRegistryImpl +import kamon.{Kamon, ReporterRegistryImpl} import kamon.metric.MetricLookup -import kamon.util.Clock +import kamon.trace.Span.TagValue +import kamon.trace.SpanContext.SamplingDecision +import kamon.trace.Tracer.SpanBuilder +import kamon.util.{Clock, DynamicAccess} import org.slf4j.LoggerFactory +import scala.collection.immutable +import scala.util.Try -class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) - extends ThreadLocalActiveSpanSource with io.opentracing.Tracer { +trait Tracer { + def buildSpan(operationName: String): SpanBuilder + def identityProvider: IdentityProvider +} - private val logger = LoggerFactory.getLogger(classOf[Tracer]) - private val tracerMetrics = new TracerMetrics(metrics) +object Tracer { - @volatile private var configuredSampler: Sampler = Sampler.never - @volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap - @volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3 + final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { + private val logger = LoggerFactory.getLogger(classOf[Tracer]) - reconfigure(initialConfig) + private[Tracer] val tracerMetrics = new TracerMetrics(metrics) + @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true + @volatile private[Tracer] var configuredSampler: Sampler = Sampler.Never + @volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default() - override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = - new SpanBuilder(operationName) + reconfigure(initialConfig) - override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match { - case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler) - case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler) - case BINARY => null // TODO: Implement Binary Encoding - case _ => null - } + override def buildSpan(operationName: String): SpanBuilder = + new SpanBuilder(operationName, this, reporterRegistry) - override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = format match { - case HTTP_HEADERS => httpHeaderSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) - case TEXT_MAP => textMapSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) - case BINARY => - case _ => - } + override def identityProvider: IdentityProvider = + this._identityProvider + + def sampler: Sampler = + configuredSampler + + private[kamon] def reconfigure(config: Config): Unit = synchronized { + Try { + val dynamic = new DynamicAccess(getClass.getClassLoader) + val traceConfig = config.getConfig("kamon.trace") - def sampler: Sampler = - configuredSampler + val newSampler = traceConfig.getString("sampler") match { + case "always" => Sampler.Always + case "never" => Sampler.Never + case "random" => Sampler.random(traceConfig.getDouble("random-sampler.probability")) + case other => sys.error(s"Unexpected sampler name $other.") + } + + val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id") - def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.textMapSpanContextCodec = codec + val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider]( + traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)] + ).get - def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.httpHeaderSpanContextCodec = codec + configuredSampler = newSampler + joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID + _identityProvider = newIdentityProvider - private class SpanBuilder(operationName: String) extends io.opentracing.Tracer.SpanBuilder { - private var parentContext: SpanContext = _ + }.failed.foreach { + ex => logger.error("Unable to reconfigure Kamon Tracer", ex) + } + } + } + + object Default { + def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default = + new Default(metrics, reporterRegistry, initialConfig) + } + + final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { + private var parentSpan: Span = _ private var startTimestamp = 0L - private var initialTags = Map.empty[String, String] + private var initialSpanTags = Map.empty[String, Span.TagValue] + private var initialMetricTags = Map.empty[String, String] private var useActiveSpanAsParent = true - override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = parent match { - case spanContext: kamon.trace.SpanContext => - this.parentContext = spanContext - this - case null => this - case _ => logger.error("Can't extract the parent ID from a non-Kamon SpanContext"); this + def asChildOf(parent: Span): SpanBuilder = { + if(parent != Span.Empty) this.parentSpan = parent + this } - override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = - asChildOf(parent.context()) - - override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { - if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) { - asChildOf(referencedContext) - } else this + def withMetricTag(key: String, value: String): SpanBuilder = { + this.initialMetricTags = this.initialMetricTags + (key -> value) + this } - override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value) + def withSpanTag(key: String, value: String): SpanBuilder = { + this.initialSpanTags = this.initialSpanTags + (key -> TagValue.String(value)) this } - override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value.toString) + def withSpanTag(key: String, value: Long): SpanBuilder = { + this.initialSpanTags = this.initialSpanTags + (key -> TagValue.Number(value)) this } - override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value.toString) + def withSpanTag(key: String, value: Boolean): SpanBuilder = { + val tagValue = if (value) TagValue.True else TagValue.False + this.initialSpanTags = this.initialSpanTags + (key -> tagValue) this } - override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = { + def withStartTimestamp(microseconds: Long): SpanBuilder = { this.startTimestamp = microseconds this } - override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = { + def ignoreActiveSpan(): SpanBuilder = { this.useActiveSpanAsParent = false this } - override def start(): io.opentracing.Span = - startManual() + def start(): Span = { + val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() - override def startActive(): io.opentracing.ActiveSpan = - makeActive(startManual()) + val parentSpan: Option[Span] = Option(this.parentSpan) + .orElse(if(useActiveSpanAsParent) Some(Kamon.currentContext().get(Span.ContextKey)) else None) + .filter(span => span != Span.Empty) - override def startManual(): Span = { - val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() + val samplingDecision: SamplingDecision = parentSpan + .map(_.context.samplingDecision) + .filter(_ != SamplingDecision.Unknown) + .getOrElse(tracer.sampler.decide(operationName, initialSpanTags)) - if(parentContext == null && useActiveSpanAsParent) { - val possibleParent = activeSpan() - if(possibleParent != null) - parentContext = possibleParent.context().asInstanceOf[SpanContext] + val spanContext = parentSpan match { + case Some(parent) => joinParentContext(parent, samplingDecision) + case None => newSpanContext(samplingDecision) } - val spanContext = - if(parentContext != null) - new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, parentContext.baggageMap) - else { - val traceID = createID() - new SpanContext(traceID, traceID, 0L, configuredSampler.decide(traceID), Map.empty) - } - - tracerMetrics.createdSpans.increment() - new Span(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry) + tracer.tracerMetrics.createdSpans.increment() + Span.Local(spanContext, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry) } - private def createID(): Long = - ThreadLocalRandom.current().nextLong() - } - - - private[kamon] def reconfigure(config: Config): Unit = synchronized { - val traceConfig = config.getConfig("kamon.trace") - - configuredSampler = traceConfig.getString("sampler") match { - case "always" => Sampler.always - case "never" => Sampler.never - case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance")) - case other => sys.error(s"Unexpected sampler name $other.") - } + private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = + if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID) + parent.context().copy(samplingDecision = samplingDecision) + else + parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) + + private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = + SpanContext( + traceID = tracer._identityProvider.traceIdGenerator().generate(), + spanID = tracer._identityProvider.spanIdGenerator().generate(), + parentID = IdentityProvider.NoIdentifier, + samplingDecision = samplingDecision + ) } private final class TracerMetrics(metricLookup: MetricLookup) { |