diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-21 09:23:07 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-21 10:37:08 +0200 |
commit | a152a3098b564ed43766a857b32b7c7d7445f9ce (patch) | |
tree | 7651f61e598f316ee9dca415c5a5c67ce530bad5 /kamon-core/src/main/scala/kamon/trace | |
parent | 3cb974e5dfd381b9b28ffef9977047cf35242121 (diff) | |
download | Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.gz Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.bz2 Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.zip |
binary encoding of context and entries
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 4 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanCodec.scala | 71 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Tracer.scala | 15 |
3 files changed, 79 insertions, 11 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index ea28142e..3158aa73 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -195,9 +195,9 @@ object Span { object Local { def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, scopeSpanMetrics: Boolean): Local = - new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, scopeSpanMetrics) + new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, scopeSpanMetrics) } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 96317696..ae78ee67 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -16,15 +16,17 @@ package kamon.trace import java.net.{URLDecoder, URLEncoder} +import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codec, Context, TextMap} +import kamon.context.{Codecs, Context, TextMap} +import kamon.context.generated.binary.span.{Span => ColferSpan} import kamon.trace.SpanContext.SamplingDecision object SpanCodec { - class B3 extends Codec.ForEntry[TextMap] { + class B3 extends Codecs.ForEntry[TextMap] { import B3.Headers override def encode(context: Context): TextMap = { @@ -96,4 +98,69 @@ object SpanCodec { val Flags = "X-B3-Flags" } } + + + class Colfer extends Codecs.ForEntry[ByteBuffer] { + val emptyBuffer = ByteBuffer.allocate(0) + + override def encode(context: Context): ByteBuffer = { + val span = context.get(Span.ContextKey) + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + val buffer = ByteBuffer.allocate(marshalledSize) + buffer.put(marshalBuffer, 0, marshalledSize) + buffer + + } else emptyBuffer + } + + override def decode(carrier: ByteBuffer, context: Context): Context = { + carrier.clear() + + if(carrier.capacity() == 0) + context + else { + val identityProvider = Kamon.tracer.identityProvider + val colferSpan = new ColferSpan() + colferSpan.unmarshal(carrier.array(), 0) + + val spanContext = SpanContext( + traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), + spanID = identityProvider.traceIdGenerator().from(colferSpan.spanID), + parentID = identityProvider.traceIdGenerator().from(colferSpan.parentID), + samplingDecision = byteToSamplingDecision(colferSpan.samplingDecision) + ) + + context.withKey(Span.ContextKey, Span.Remote(spanContext)) + } + } + + + private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { + case SamplingDecision.Sample => 1 + case SamplingDecision.DoNotSample => 2 + case SamplingDecision.Unknown => 3 + } + + private def byteToSamplingDecision(byte: Byte): SamplingDecision = byte match { + case 1 => SamplingDecision.Sample + case 2 => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + } + + object Colfer { + private val codecBuffer = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](256) + } + } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index f2f1918c..5f61f3aa 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -16,7 +16,8 @@ package kamon.trace import com.typesafe.config.Config -import kamon.{Kamon, ReporterRegistryImpl} +import kamon.ReporterRegistry.SpanSink +import kamon.Kamon import kamon.metric.MetricLookup import kamon.trace.Span.TagValue import kamon.trace.SpanContext.SamplingDecision @@ -34,7 +35,7 @@ trait Tracer { object Tracer { - final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { + final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config) extends Tracer { private val logger = LoggerFactory.getLogger(classOf[Tracer]) private[Tracer] val tracerMetrics = new TracerMetrics(metrics) @@ -46,7 +47,7 @@ object Tracer { reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = - new SpanBuilder(operationName, this, reporterRegistry) + new SpanBuilder(operationName, this, spanSink) override def identityProvider: IdentityProvider = this._identityProvider @@ -84,11 +85,11 @@ object Tracer { } object Default { - def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default = - new Default(metrics, reporterRegistry, initialConfig) + def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config): Default = + new Default(metrics, spanSink, initialConfig) } - final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { + final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink) { private var parentSpan: Span = _ private var startTimestamp = 0L private var initialSpanTags = Map.empty[String, Span.TagValue] @@ -157,7 +158,7 @@ object Tracer { } tracer.tracerMetrics.createdSpans.increment() - Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, tracer.scopeSpanMetrics) + Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, tracer.scopeSpanMetrics) } private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = |