From 34010efc7b273e50d805a277646f14aa96aaa8b2 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 14 Jul 2017 14:12:47 +0200 Subject: wip --- kamon-core/src/main/scala/kamon/trace/Tracer.scala | 217 ++++++++++++--------- 1 file changed, 121 insertions(+), 96 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/trace/Tracer.scala') diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 19067f5e..1aec8d7c 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -16,148 +16,173 @@ package kamon.trace -import java.util.concurrent.ThreadLocalRandom +import java.nio.ByteBuffer import com.typesafe.config.Config -import io.opentracing.propagation.{Format, TextMap} -import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP} -import io.opentracing.util.ThreadLocalActiveSpanSource import kamon.ReporterRegistryImpl import kamon.metric.MetricLookup +import kamon.trace.SpanContext.{SamplingDecision, Source} +import kamon.trace.Tracer.SpanBuilder import kamon.util.Clock import org.slf4j.LoggerFactory -class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) - extends ThreadLocalActiveSpanSource with io.opentracing.Tracer { +trait Tracer { + def buildSpan(operationName: String): SpanBuilder + def activeSpan(): ActiveSpan + def makeActive(span: Span): ActiveSpan - private val logger = LoggerFactory.getLogger(classOf[Tracer]) - private val tracerMetrics = new TracerMetrics(metrics) + def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] + def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit - @volatile private var configuredSampler: Sampler = Sampler.never - @volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap - @volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3 - reconfigure(initialConfig) - override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = - new SpanBuilder(operationName) + // + // Configuration Utilities + // - override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match { - case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler) - case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler) - case BINARY => null // TODO: Implement Binary Encoding - case _ => null - } + def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit + def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit +} - override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = format match { - case HTTP_HEADERS => httpHeaderSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) - case TEXT_MAP => textMapSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) - case BINARY => - case _ => - } +object Tracer { - def sampler: Sampler = - configuredSampler + final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { + private val logger = LoggerFactory.getLogger(classOf[Tracer]) + private val emptySpan = Span.Empty(this) + private val activeSpanStorage: ThreadLocal[ActiveSpan] = new ThreadLocal[ActiveSpan] { + override def initialValue(): ActiveSpan = ActiveSpan.Default(emptySpan, null, activeSpanStorage) + } - def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.textMapSpanContextCodec = codec + private[Tracer] val tracerMetrics = new TracerMetrics(metrics) + @volatile private[Tracer] var joinRemoteSpansWithSameID: Boolean = false + @volatile private[Tracer] var configuredSampler: Sampler = Sampler.never + @volatile private[Tracer] var idGenerator: IdentifierGenerator = IdentifierGenerator.RandomLong() + @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.TextMap + @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ZipkinB3 - def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.httpHeaderSpanContextCodec = codec + reconfigure(initialConfig) - private class SpanBuilder(operationName: String) extends io.opentracing.Tracer.SpanBuilder { - private var parentContext: SpanContext = _ - private var startTimestamp = 0L - private var initialTags = Map.empty[String, String] - private var useActiveSpanAsParent = true + def buildSpan(operationName: String): SpanBuilder = + new SpanBuilder(operationName, this, reporterRegistry) + + def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match { + case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) + case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) + case SpanContextCodec.Format.Binary => None + case _ => None + } - override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = parent match { - case spanContext: kamon.trace.SpanContext => - this.parentContext = spanContext - this - case null => this - case _ => logger.error("Can't extract the parent ID from a non-Kamon SpanContext"); this + def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit = format match { + case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) + case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) + case SpanContextCodec.Format.Binary => + case _ => } - override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = - asChildOf(parent.context()) + def activeSpan(): ActiveSpan = + activeSpanStorage.get() - override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { - if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) { - asChildOf(referencedContext) - } else this + def makeActive(span: Span): ActiveSpan = { + val currentlyActiveSpan = activeSpanStorage.get() + val newActiveSpan = ActiveSpan.Default(span, currentlyActiveSpan, activeSpanStorage) + activeSpanStorage.set(newActiveSpan) + newActiveSpan } - override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value) - this + def sampler: Sampler = + configuredSampler + + def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = + this.textMapSpanContextCodec = codec + + def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = + this.httpHeaderSpanContextCodec = codec + + + private[kamon] def reconfigure(config: Config): Unit = synchronized { + val traceConfig = config.getConfig("kamon.trace") + + configuredSampler = traceConfig.getString("sampler") match { + case "always" => Sampler.always + case "never" => Sampler.never + case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance")) + case other => sys.error(s"Unexpected sampler name $other.") + } + } + + private final class TracerMetrics(metricLookup: MetricLookup) { + val createdSpans = metricLookup.counter("tracer.spans-created") } + } + + final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { + private var parentContext: SpanContext = _ + private var startTimestamp = 0L + private var initialTags = Map.empty[String, String] + private var useActiveSpanAsParent = true - override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value.toString) + def asChildOf(parentContext: SpanContext): SpanBuilder = { + this.parentContext = parentContext this } - override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value.toString) + def asChildOf(parentSpan: Span): SpanBuilder = + asChildOf(parentSpan.context()) + + def withSpanTag(key: String, value: String): SpanBuilder = { + this.initialTags = this.initialTags + (key -> value) this } - override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = { + def withStartTimestamp(microseconds: Long): SpanBuilder = { this.startTimestamp = microseconds this } - override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = { + def ignoreActiveSpan(): SpanBuilder = { this.useActiveSpanAsParent = false this } - override def start(): io.opentracing.Span = - startManual() - - override def startActive(): io.opentracing.ActiveSpan = - makeActive(startManual()) - - override def startManual(): Span = { + def start(): Span = { val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() - if(parentContext == null && useActiveSpanAsParent) { - val possibleParent = activeSpan() - if(possibleParent != null) - parentContext = possibleParent.context().asInstanceOf[SpanContext] - } + val parentSpanContext: Option[SpanContext] = Option(parentContext) + .orElse(if(useActiveSpanAsParent) Some(tracer.activeSpan().context()) else None) + .filter(spanContext => spanContext != SpanContext.EmptySpanContext) - val spanContext = - if(parentContext != null) - new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, parentContext.baggageMap) - else { - val traceID = createID() - new SpanContext(traceID, traceID, 0L, configuredSampler.decide(traceID), Map.empty) - } + val samplingDecision: SamplingDecision = parentSpanContext + .map(_.samplingDecision) + .filter(_ != SamplingDecision.Unknown) + .getOrElse(tracer.sampler.decide(operationName, initialTags)) - tracerMetrics.createdSpans.increment() - new Span(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry) - } - - private def createID(): Long = - ThreadLocalRandom.current().nextLong() - } - - - private[kamon] def reconfigure(config: Config): Unit = synchronized { - val traceConfig = config.getConfig("kamon.trace") + val spanContext = parentSpanContext match { + case Some(parent) => joinParentContext(parent, samplingDecision) + case None => newSpanContext(samplingDecision) + } - configuredSampler = traceConfig.getString("sampler") match { - case "always" => Sampler.always - case "never" => Sampler.never - case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance")) - case other => sys.error(s"Unexpected sampler name $other.") + tracer.tracerMetrics.createdSpans.increment() + Span.Real(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry, tracer) } - } - private final class TracerMetrics(metricLookup: MetricLookup) { - val createdSpans = metricLookup.counter("tracer.spans-created") + private def joinParentContext(parent: SpanContext, samplingDecision: SamplingDecision): SpanContext = + if(parent.source == Source.Remote && tracer.joinRemoteSpansWithSameID) + parent.copy(samplingDecision = samplingDecision) + else + parent.createChild(tracer.idGenerator.generateSpanID(), samplingDecision) + + private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = + SpanContext( + traceID = tracer.idGenerator.generateTraceID(), + spanID = tracer.idGenerator.generateSpanID(), + parentID = tracer.idGenerator.generateEmptyID(), + samplingDecision = samplingDecision, + baggage = SpanContext.Baggage(), + source = Source.Local + ) + + def startActive(): ActiveSpan = + tracer.makeActive(start()) } } -- cgit v1.2.3