From 22379d3f318b2cd3a4c995ff1c45bda33d935a46 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Tue, 23 May 2017 19:06:32 +0200 Subject: get some basic sampling going --- .../src/main/scala/kamon/trace/Sampler.scala | 27 +++++ kamon-core/src/main/scala/kamon/trace/Span.scala | 53 ++++----- .../src/main/scala/kamon/trace/SpanContext.scala | 11 +- .../main/scala/kamon/trace/SpanContextCodec.scala | 87 ++++++++++++++ kamon-core/src/main/scala/kamon/trace/Tracer.scala | 132 ++++++++++++--------- .../src/main/scala/kamon/util/HexCodec.scala | 85 +++++++++++++ 6 files changed, 303 insertions(+), 92 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala create mode 100644 kamon-core/src/main/scala/kamon/util/HexCodec.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala index a26e61be..491cf358 100644 --- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -1,5 +1,32 @@ package kamon.trace trait Sampler { + def decide(spanID: Long): Boolean +} + +object Sampler { + val always = new Constant(true) + val never = new Constant(false) + + def random(chance: Double): Sampler = { + assert(chance >= 0D && chance <= 1.0D, "Change should be >= 0 and <= 1.0") + + chance match { + case 0D => never + case 1.0D => always + case anyOther => new Random(anyOther) + } + } + + class Constant(decision: Boolean) extends Sampler { + override def decide(spanID: Long): Boolean = decision + } + + class Random(chance: Double) extends Sampler { + val upperBoundary = Long.MaxValue * chance + val lowerBoundary = -upperBoundary + override def decide(spanID: Long): Boolean = + spanID >= lowerBoundary && spanID <= upperBoundary + } } diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 804627dc..904c0a22 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -28,16 +28,16 @@ object Span { } -class Span(spanContext: SpanContext, initialOperationName: String, startTimestampMicros: Long, +class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long, recorderRegistry: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { private var isOpen: Boolean = true - private val isSampled: Boolean = true // TODO: User a proper sampler + private val sampled: Boolean = spanContext.sampled private var operationName: String = initialOperationName private var endTimestampMicros: Long = 0 + private var tags = initialTags private var logs = List.empty[Span.LogEntry] - private var tags = Map.empty[String, String] private var metricTags = Map.empty[String, String] @@ -45,47 +45,41 @@ class Span(spanContext: SpanContext, initialOperationName: String, startTimestam log(fields.asScala.asInstanceOf[Map[String, _]]) def log(fields: Map[String, _]): Span = synchronized { - if (isSampled && isOpen) { + if (sampled && isOpen) logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs - } this } - override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = - log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) - def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { - if(isSampled && isOpen) { + if(sampled && isOpen) logs = Span.LogEntry(timestampMicroseconds, fields) :: logs - } this } + override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = + log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + override def log(event: String): Span = synchronized { - if(isSampled && isOpen) { + if(sampled && isOpen) logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs - } this } override def log(timestampMicroseconds: Long, event: String): Span = synchronized { - if(isSampled && isOpen) { + if(sampled && isOpen) logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs - } this } override def log(eventName: String, payload: scala.Any): Span = synchronized { - if(isSampled && isOpen) { + if(sampled && isOpen) logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs - } this } override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { - if(isSampled && isOpen) { + if(sampled && isOpen) logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs - } this } @@ -98,7 +92,7 @@ class Span(spanContext: SpanContext, initialOperationName: String, startTimestam override def setTag(key: String, value: String): Span = synchronized { if (isOpen) { extractMetricTag(key, value) - if(isSampled) + if(sampled) tags = tags ++ Map(key -> value) } this @@ -108,7 +102,7 @@ class Span(spanContext: SpanContext, initialOperationName: String, startTimestam if (isOpen) { val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue extractMetricTag(key, tagValue) - if(isSampled) + if(sampled) tags = tags + (key -> tagValue) } this @@ -118,38 +112,33 @@ class Span(spanContext: SpanContext, initialOperationName: String, startTimestam if (isOpen) { val tagValue = String.valueOf(value) extractMetricTag(key, tagValue) - if(isSampled) + if(sampled) tags = tags + (key -> tagValue) } this } def setMetricTag(key: String, value: String): Span = synchronized { - if (isOpen) { + if (isOpen) metricTags = metricTags ++ Map(key -> value) - } this } override def setBaggageItem(key: String, value: String): Span = synchronized { - if (isOpen) { + if (isOpen) spanContext.addBaggageItem(key, value) - } this } - override def setOperationName(operationName: String): Span = { - if(isOpen) { + override def setOperationName(operationName: String): Span = synchronized { + if(isOpen) this.operationName = operationName - } this } - private def extractMetricTag(tag: String, value: String): Unit = { - if(tag.startsWith(Span.MetricTagPrefix)) { + private def extractMetricTag(tag: String, value: String): Unit = + if(tag.startsWith(Span.MetricTagPrefix)) metricTags = metricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) - } - } override def finish(): Unit = finish(Clock.microTimestamp()) diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala index a3afb36d..7fddf3f4 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala @@ -1,11 +1,12 @@ package kamon.trace import java.lang -import java.util.Map +import java.util.{Map => JavaMap} + import scala.collection.JavaConverters._ -class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long) extends io.opentracing.SpanContext { - private var baggage = scala.collection.immutable.Map.empty[String, String] +class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long, val sampled: Boolean, + private var baggage: Map[String, String]) extends io.opentracing.SpanContext { private[kamon] def addBaggageItem(key: String, value: String): Unit = synchronized { baggage = baggage + (key -> value) @@ -15,10 +16,10 @@ class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long) exten baggage.get(key).getOrElse(null) } - private[kamon] def baggageMap: scala.collection.immutable.Map[String, String] = + private[kamon] def baggageMap: Map[String, String] = baggage - override def baggageItems(): lang.Iterable[Map.Entry[String, String]] = synchronized { + override def baggageItems(): lang.Iterable[JavaMap.Entry[String, String]] = synchronized { baggage.asJava.entrySet() } } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala new file mode 100644 index 00000000..6e687d66 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala @@ -0,0 +1,87 @@ +package kamon.trace + +import java.net.{URLDecoder, URLEncoder} +import java.util.concurrent.ThreadLocalRandom + +import io.opentracing.propagation.TextMap +import kamon.util.HexCodec + +trait SpanContextCodec[T] { + def inject(spanContext: SpanContext, carrier: T): Unit + def extract(carrier: T, sampler: Sampler): SpanContext +} + +object SpanContextCodec { + + val TextMap: SpanContextCodec[TextMap] = new TextMapSpanCodec( + traceIDKey = "TRACE_ID", + parentIDKey = "PARENT_ID", + spanIDKey = "SPAN_ID", + sampledKey = "SAMPLED", + baggagePrefix = "BAGGAGE_", + baggageValueEncoder = identity, + baggageValueDecoder = identity + ) + + val ZipkinB3: SpanContextCodec[TextMap] = new TextMapSpanCodec( + traceIDKey = "X─B3─TraceId", + parentIDKey = "X─B3─ParentSpanId", + spanIDKey = "X─B3─SpanId", + sampledKey = "X─B3─Sampled", + baggagePrefix = "X─B3─Baggage-", + baggageValueEncoder = urlEncode, + baggageValueDecoder = urlDecode + ) + + private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") + private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") + + private class TextMapSpanCodec(traceIDKey: String, parentIDKey: String, spanIDKey: String, sampledKey: String, baggagePrefix: String, + baggageValueEncoder: String => String, baggageValueDecoder: String => String) extends SpanContextCodec[TextMap] { + + override def inject(spanContext: SpanContext, carrier: TextMap): Unit = { + carrier.put(traceIDKey, encodeLong(spanContext.traceID)) + carrier.put(parentIDKey, encodeLong(spanContext.parentID)) + carrier.put(spanIDKey, encodeLong(spanContext.spanID)) + + spanContext.baggageItems().forEach { entry => + carrier.put(baggagePrefix + entry.getKey, baggageValueEncoder(entry.getValue)) + } + } + + override def extract(carrier: TextMap, sampler: Sampler): SpanContext = { + var traceID: String = null + var parentID: String = null + var spanID: String = null + var sampled: String = null + var baggage: Map[String, String] = Map.empty + + carrier.forEach { entry => + if(entry.getKey.equals(traceIDKey)) + traceID = baggageValueDecoder(entry.getValue) + else if(entry.getKey.equals(parentIDKey)) + parentID = baggageValueDecoder(entry.getValue) + else if(entry.getKey.equals(spanIDKey)) + spanID = baggageValueDecoder(entry.getValue) + else if(entry.getKey.equals(sampledKey)) + sampled = entry.getValue + else if(entry.getKey.startsWith(baggagePrefix)) + baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue)) + } + + if(traceID != null && spanID != null) { + val actualParent = if(parentID == null) 0L else decodeLong(parentID) + val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1") + + new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage) + } else null + } + + private def decodeLong(input: String): Long = + HexCodec.lowerHexToUnsignedLong(input) + + private def encodeLong(input: Long): String = + HexCodec.toLowerHex(input) + + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 84aafe68..6bb5a252 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -1,27 +1,40 @@ package kamon.trace -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ThreadLocalRandom -import io.opentracing.propagation.Format +import com.typesafe.scalalogging.Logger +import io.opentracing.propagation.{TextMap, Format} +import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP} import io.opentracing.util.ThreadLocalActiveSpanSource import kamon.ReporterRegistryImpl -import kamon.metric.RecorderRegistry +import kamon.metric.{Entity, EntityRecorder, RecorderRegistry} import kamon.util.Clock class Tracer(metrics: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Tracer { - private val traceCounter = new AtomicLong() - private val spanCounter = new AtomicLong() + private val logger = Logger(classOf[Tracer]) + private val metricsRecorder = new TracerMetricsRecorder(metrics.getRecorder(Entity("tracer", "tracer", Map.empty))) private val activeSpanSource = new ThreadLocalActiveSpanSource() + @volatile private var sampler: Sampler = Sampler.never + @volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap + @volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3 override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = - new SpanBuilder(operationName, spanCounter.incrementAndGet()) + new SpanBuilder(operationName) - override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = - sys.error("Extracting not implemented yet.") + override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match { + case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], sampler) + case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], sampler) + case BINARY => null // TODO: Implement Binary Encoding + case _ => null + } - override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = - sys.error("Injecting not implemented yet.") + override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = format match { + case HTTP_HEADERS => httpHeaderSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) + case TEXT_MAP => textMapSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) + case BINARY => + case _ => + } override def activeSpan(): io.opentracing.ActiveSpan = activeSpanSource.activeSpan() @@ -29,82 +42,91 @@ class Tracer(metrics: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) override def makeActive(span: io.opentracing.Span): io.opentracing.ActiveSpan = activeSpanSource.makeActive(span) + def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = + this.textMapSpanContextCodec = codec - private[kamon] def newTraceID: Long = - traceCounter.incrementAndGet() + def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = + this.httpHeaderSpanContextCodec = codec - private class SpanBuilder(operationName: String, spanID: Long) extends io.opentracing.Tracer.SpanBuilder { - private var traceID = 0L + private class SpanBuilder(operationName: String) extends io.opentracing.Tracer.SpanBuilder { + private var parentContext: SpanContext = _ private var startTimestamp = 0L - private var parentID = 0L private var initialTags = Map.empty[String, String] + private var useActiveSpanAsParent = true - override def start(): io.opentracing.Span = - startManual() - - override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { - parent match { - case kamonSpanContext: kamon.trace.SpanContext => - traceID = kamonSpanContext.traceID - parentID = kamonSpanContext.spanID - case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") - } - this + override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = parent match { + case spanContext: kamon.trace.SpanContext => + this.parentContext = spanContext + this + case _ => logger.error("Can't extract the parent ID from a non-Kamon SpanContext"); this } - override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = { - parent.context() match { - case kamonSpanContext: kamon.trace.SpanContext => - traceID = kamonSpanContext.traceID - parentID = kamonSpanContext.spanID - case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") - } - this - } + override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = + asChildOf(parent.context()) override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) { - referencedContext match { - case kamonSpanContext: kamon.trace.SpanContext => - traceID = kamonSpanContext.traceID - parentID = kamonSpanContext.spanID - case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") - } - } - this + asChildOf(referencedContext) + } else this } override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = { - initialTags = initialTags + (key -> value) + this.initialTags = this.initialTags + (key -> value) this } override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = { - initialTags = initialTags + (key -> value.toString) + this.initialTags = this.initialTags + (key -> value.toString) this } override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = { - initialTags = initialTags + (key -> value.toString) + this.initialTags = this.initialTags + (key -> value.toString) this } - override def startManual(): Span = { - if(traceID == 0L) traceID = Tracer.this.newTraceID - val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() - new Span(new SpanContext(traceID, spanID, parentID), operationName, startTimestampMicros, metrics, reporterRegistry) + override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = { + this.startTimestamp = microseconds + this } - override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = { - startTimestamp = microseconds + override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = { + this.useActiveSpanAsParent = false this } - override def startActive(): io.opentracing.ActiveSpan = { - Tracer.this.makeActive(startManual()) + override def start(): io.opentracing.Span = + startManual() + + override def startActive(): io.opentracing.ActiveSpan = + makeActive(startManual()) + + override def startManual(): Span = { + val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() + + if(parentContext == null && useActiveSpanAsParent) { + val possibleParent = activeSpan() + if(possibleParent != null) + parentContext = possibleParent.context().asInstanceOf[SpanContext] + } + + val spanContext = + if(parentContext != null) + new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, initialTags) + else { + val traceID = createID() + new SpanContext(traceID, traceID, 0L, sampler.decide(traceID), initialTags) + } + + metricsRecorder.createdSpans.increment() + new Span(spanContext, operationName, initialTags, startTimestampMicros, metrics, reporterRegistry) } - override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = ??? + private def createID(): Long = + ThreadLocalRandom.current().nextLong() } + private class TracerMetricsRecorder(recorder: EntityRecorder) { + val createdSpans = recorder.counter("created-spans") + } } diff --git a/kamon-core/src/main/scala/kamon/util/HexCodec.scala b/kamon-core/src/main/scala/kamon/util/HexCodec.scala new file mode 100644 index 00000000..6e9dd2df --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/HexCodec.scala @@ -0,0 +1,85 @@ +package kamon.util + +// Extracted from https://github.com/openzipkin/brave/blob/master/brave/src/main/java/brave/internal/HexCodec.java +object HexCodec { + /** + * Parses a 1 to 32 character lower-hex string with no prefix into an unsigned long, tossing any + * bits higher than 64. + */ + def lowerHexToUnsignedLong(lowerHex: String): Long = { + val length = lowerHex.length + if (length < 1 || length > 32) throw isntLowerHexLong(lowerHex) + // trim off any high bits + val beginIndex = if (length > 16) length - 16 + else 0 + lowerHexToUnsignedLong(lowerHex, beginIndex) + } + + private def isntLowerHexLong(lowerHex: String) = + throw new NumberFormatException(lowerHex + " should be a 1 to 32 character lower-hex string with no prefix") + + /** + * Parses a 16 character lower-hex string with no prefix into an unsigned long, starting at the + * spe index. + */ + private def lowerHexToUnsignedLong(lowerHex: String, index: Int): Long = { + var i = index + var result = 0L + val endIndex = Math.min(index + 16, lowerHex.length) + while ( { + index < endIndex + }) { + val c = lowerHex.charAt(index) + result <<= 4 + if (c >= '0' && c <= '9') result |= c - '0' + else if (c >= 'a' && c <= 'f') result |= c - 'a' + 10 + else throw isntLowerHexLong(lowerHex) + + { + i = i + 1; index - 1 + } + } + result + } + + /** + * Returns 16 or 32 character hex string depending on if {@code high} is zero. + */ + private def toLowerHex(high: Long, low: Long): String = { + val result = new Array[Char](if (high != 0) 32 else 16) + var pos = 0 + if (high != 0) { + writeHexLong(result, pos, high) + pos += 16 + } + writeHexLong(result, pos, low) + new String(result) + } + + /** + * Inspired by {@code okio.Buffer.writeLong} + */ + def toLowerHex(v: Long): String = { + val data = new Array[Char](16) + writeHexLong(data, 0, v) + new String(data) + } + + private def writeHexLong(data: Array[Char], pos: Int, v: Long): Unit = { + writeHexByte(data, pos + 0, ((v >>> 56L) & 0xff).toByte) + writeHexByte(data, pos + 2, ((v >>> 48L) & 0xff).toByte) + writeHexByte(data, pos + 4, ((v >>> 40L) & 0xff).toByte) + writeHexByte(data, pos + 6, ((v >>> 32L) & 0xff).toByte) + writeHexByte(data, pos + 8, ((v >>> 24L) & 0xff).toByte) + writeHexByte(data, pos + 10, ((v >>> 16L) & 0xff).toByte) + writeHexByte(data, pos + 12, ((v >>> 8L) & 0xff).toByte) + writeHexByte(data, pos + 14, (v & 0xff).toByte) + } + + private val HEX_DIGITS = Array('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f') + + private def writeHexByte(data: Array[Char], pos: Int, b: Byte): Unit = { + data(pos + 0) = HEX_DIGITS((b >> 4) & 0xf) + data(pos + 1) = HEX_DIGITS(b & 0xf) + } +} \ No newline at end of file -- cgit v1.2.3