aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-08-21 09:23:07 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-08-21 10:37:08 +0200
commita152a3098b564ed43766a857b32b7c7d7445f9ce (patch)
tree7651f61e598f316ee9dca415c5a5c67ce530bad5 /kamon-core/src/main/scala/kamon/trace
parent3cb974e5dfd381b9b28ffef9977047cf35242121 (diff)
downloadKamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.gz
Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.bz2
Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.zip
binary encoding of context and entries
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCodec.scala71
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala15
3 files changed, 79 insertions, 11 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index ea28142e..3158aa73 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -195,9 +195,9 @@ object Span {
object Local {
def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl,
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink,
scopeSpanMetrics: Boolean): Local =
- new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, scopeSpanMetrics)
+ new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, scopeSpanMetrics)
}
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
index 96317696..ae78ee67 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
@@ -16,15 +16,17 @@
package kamon.trace
import java.net.{URLDecoder, URLEncoder}
+import java.nio.ByteBuffer
import kamon.Kamon
-import kamon.context.{Codec, Context, TextMap}
+import kamon.context.{Codecs, Context, TextMap}
+import kamon.context.generated.binary.span.{Span => ColferSpan}
import kamon.trace.SpanContext.SamplingDecision
object SpanCodec {
- class B3 extends Codec.ForEntry[TextMap] {
+ class B3 extends Codecs.ForEntry[TextMap] {
import B3.Headers
override def encode(context: Context): TextMap = {
@@ -96,4 +98,69 @@ object SpanCodec {
val Flags = "X-B3-Flags"
}
}
+
+
+ class Colfer extends Codecs.ForEntry[ByteBuffer] {
+ val emptyBuffer = ByteBuffer.allocate(0)
+
+ override def encode(context: Context): ByteBuffer = {
+ val span = context.get(Span.ContextKey)
+ if(span.nonEmpty()) {
+ val marshalBuffer = Colfer.codecBuffer.get()
+ val colferSpan = new ColferSpan()
+ val spanContext = span.context()
+
+ colferSpan.setTraceID(spanContext.traceID.bytes)
+ colferSpan.setSpanID(spanContext.spanID.bytes)
+ colferSpan.setParentID(spanContext.parentID.bytes)
+ colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision))
+
+ val marshalledSize = colferSpan.marshal(marshalBuffer, 0)
+ val buffer = ByteBuffer.allocate(marshalledSize)
+ buffer.put(marshalBuffer, 0, marshalledSize)
+ buffer
+
+ } else emptyBuffer
+ }
+
+ override def decode(carrier: ByteBuffer, context: Context): Context = {
+ carrier.clear()
+
+ if(carrier.capacity() == 0)
+ context
+ else {
+ val identityProvider = Kamon.tracer.identityProvider
+ val colferSpan = new ColferSpan()
+ colferSpan.unmarshal(carrier.array(), 0)
+
+ val spanContext = SpanContext(
+ traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID),
+ spanID = identityProvider.traceIdGenerator().from(colferSpan.spanID),
+ parentID = identityProvider.traceIdGenerator().from(colferSpan.parentID),
+ samplingDecision = byteToSamplingDecision(colferSpan.samplingDecision)
+ )
+
+ context.withKey(Span.ContextKey, Span.Remote(spanContext))
+ }
+ }
+
+
+ private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match {
+ case SamplingDecision.Sample => 1
+ case SamplingDecision.DoNotSample => 2
+ case SamplingDecision.Unknown => 3
+ }
+
+ private def byteToSamplingDecision(byte: Byte): SamplingDecision = byte match {
+ case 1 => SamplingDecision.Sample
+ case 2 => SamplingDecision.DoNotSample
+ case _ => SamplingDecision.Unknown
+ }
+ }
+
+ object Colfer {
+ private val codecBuffer = new ThreadLocal[Array[Byte]] {
+ override def initialValue(): Array[Byte] = Array.ofDim[Byte](256)
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index f2f1918c..5f61f3aa 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -16,7 +16,8 @@
package kamon.trace
import com.typesafe.config.Config
-import kamon.{Kamon, ReporterRegistryImpl}
+import kamon.ReporterRegistry.SpanSink
+import kamon.Kamon
import kamon.metric.MetricLookup
import kamon.trace.Span.TagValue
import kamon.trace.SpanContext.SamplingDecision
@@ -34,7 +35,7 @@ trait Tracer {
object Tracer {
- final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer {
+ final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config) extends Tracer {
private val logger = LoggerFactory.getLogger(classOf[Tracer])
private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
@@ -46,7 +47,7 @@ object Tracer {
reconfigure(initialConfig)
override def buildSpan(operationName: String): SpanBuilder =
- new SpanBuilder(operationName, this, reporterRegistry)
+ new SpanBuilder(operationName, this, spanSink)
override def identityProvider: IdentityProvider =
this._identityProvider
@@ -84,11 +85,11 @@ object Tracer {
}
object Default {
- def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default =
- new Default(metrics, reporterRegistry, initialConfig)
+ def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config): Default =
+ new Default(metrics, spanSink, initialConfig)
}
- final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) {
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink) {
private var parentSpan: Span = _
private var startTimestamp = 0L
private var initialSpanTags = Map.empty[String, Span.TagValue]
@@ -157,7 +158,7 @@ object Tracer {
}
tracer.tracerMetrics.createdSpans.increment()
- Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, tracer.scopeSpanMetrics)
+ Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, tracer.scopeSpanMetrics)
}
private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =