diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-05-18 16:21:44 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-05-18 16:21:44 +0200 |
commit | 77f2666650726352a9e15dcf6019064d91393b2e (patch) | |
tree | c39f7e2a18ac6bb1fcd1e2cc73dd3c165919515e /kamon-core/src/main/scala/kamon/trace | |
parent | 5dee54a0794b282e9b5729a3d4b85478c12a68d1 (diff) | |
download | Kamon-77f2666650726352a9e15dcf6019064d91393b2e.tar.gz Kamon-77f2666650726352a9e15dcf6019064d91393b2e.tar.bz2 Kamon-77f2666650726352a9e15dcf6019064d91393b2e.zip |
some more wip
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 183 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanContext.scala | 21 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Tracer.scala | 109 |
3 files changed, 311 insertions, 2 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala new file mode 100644 index 00000000..87115e19 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -0,0 +1,183 @@ +package kamon +package trace + +import kamon.metric.RecorderRegistry +import kamon.metric.instrument.DynamicRange + +import scala.collection.JavaConverters._ +import kamon.util.{Clock, MeasurementUnit} + +object Span { + val MetricCategory = "span" + val LatencyMetricName = "elapsed-time" + val ErrorMetricName = "error" + val MetricTagPrefix = "metric." + val BooleanTagTrueValue = "1" + val BooleanTagFalseValue = "0" + + case class LogEntry(timestamp: Long, fields: Map[String, _]) + + case class CompletedSpan( + context: SpanContext, + operationName: String, + startTimestampMicros: Long, + endTimestampMicros: Long, + tags: Map[String, String], + logs: Seq[LogEntry] + ) +} + + +class Span(spanContext: SpanContext, initialOperationName: String, startTimestampMicros: Long, + recorderRegistry: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { + + private var isOpen: Boolean = true + private val isSampled: Boolean = true // TODO: User a proper sampler + private var operationName: String = initialOperationName + private var endTimestampMicros: Long = 0 + + private var logs = List.empty[Span.LogEntry] + private var tags = Map.empty[String, String] + private var metricTags = Map.empty[String, String] + + + override def log(fields: java.util.Map[String, _]): Span = + log(fields.asScala.asInstanceOf[Map[String, _]]) + + def log(fields: Map[String, _]): Span = synchronized { + if (isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = + log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + + def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, fields) :: logs + } + this + } + + override def log(event: String): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, event: String): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs + } + this + } + + override def log(eventName: String, payload: scala.Any): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs + } + this + } + + override def getBaggageItem(key: String): String = + spanContext.getBaggage(key) + + override def context(): SpanContext = + spanContext + + override def setTag(key: String, value: String): Span = synchronized { + if (isOpen) { + extractMetricTag(key, value) + if(isSampled) + tags = tags ++ Map(key -> value) + } + this + } + + override def setTag(key: String, value: Boolean): Span = { + if (isOpen) { + val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue + extractMetricTag(key, tagValue) + if(isSampled) + tags = tags + (key -> tagValue) + } + this + } + + override def setTag(key: String, value: Number): Span = { + if (isOpen) { + val tagValue = String.valueOf(value) + extractMetricTag(key, tagValue) + if(isSampled) + tags = tags + (key -> tagValue) + } + this + } + + def setMetricTag(key: String, value: String): Span = synchronized { + if (isOpen) { + metricTags = metricTags ++ Map(key -> value) + } + this + } + + override def setBaggageItem(key: String, value: String): Span = synchronized { + if (isOpen) { + spanContext.addBaggageItem(key, value) + } + this + } + + override def setOperationName(operationName: String): Span = { + if(isOpen) { + this.operationName = operationName + } + this + } + + private def extractMetricTag(tag: String, value: String): Unit = { + if(tag.startsWith(Span.MetricTagPrefix)) { + metricTags = metricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) + } + } + + override def finish(): Unit = + finish(Clock.microTimestamp()) + + override def finish(finishMicros: Long): Unit = + if(isOpen) { + isOpen = false + endTimestampMicros = finishMicros + recordSpanMetrics() + reporterRegistry.reportSpan(completedSpan) + } + + private def completedSpan: Span.CompletedSpan = + Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs) + + private def recordSpanMetrics(): Unit = { + val elapsedTime = endTimestampMicros - startTimestampMicros + val recorder = recorderRegistry.getRecorder(operationName, Span.MetricCategory, metricTags) + + recorder + .histogram(Span.LatencyMetricName, MeasurementUnit.time.microseconds, DynamicRange.Default) + .record(elapsedTime) + + tags.get("error").foreach { errorTag => + if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) { + recorder.counter(Span.ErrorMetricName).increment() + } + } + + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala new file mode 100644 index 00000000..7f5962e0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala @@ -0,0 +1,21 @@ +package kamon.trace +import java.lang +import java.util.Map +import scala.collection.JavaConverters._ + +class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long) extends io.opentracing.SpanContext { + private var baggage = scala.collection.immutable.Map.empty[String, String] + + private[kamon] def addBaggageItem(key: String, value: String): Unit = { + baggage = baggage + (key -> value) + } + + private[kamon] def getBaggage(key: String): String = + baggage.get(key).getOrElse(null) + + private[kamon] def baggageMap: scala.collection.immutable.Map[String, String] = + baggage + + override def baggageItems(): lang.Iterable[Map.Entry[String, String]] = + baggage.asJava.entrySet() +} diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 802d95ec..84aafe68 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -1,5 +1,110 @@ package kamon.trace -trait Tracer extends io.opentracing.Tracer { - def sampler: Sampler +import java.util.concurrent.atomic.AtomicLong + +import io.opentracing.propagation.Format +import io.opentracing.util.ThreadLocalActiveSpanSource +import kamon.ReporterRegistryImpl +import kamon.metric.RecorderRegistry +import kamon.util.Clock + +class Tracer(metrics: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Tracer { + private val traceCounter = new AtomicLong() + private val spanCounter = new AtomicLong() + private val activeSpanSource = new ThreadLocalActiveSpanSource() + + + override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = + new SpanBuilder(operationName, spanCounter.incrementAndGet()) + + override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = + sys.error("Extracting not implemented yet.") + + override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = + sys.error("Injecting not implemented yet.") + + override def activeSpan(): io.opentracing.ActiveSpan = + activeSpanSource.activeSpan() + + override def makeActive(span: io.opentracing.Span): io.opentracing.ActiveSpan = + activeSpanSource.makeActive(span) + + + private[kamon] def newTraceID: Long = + traceCounter.incrementAndGet() + + private class SpanBuilder(operationName: String, spanID: Long) extends io.opentracing.Tracer.SpanBuilder { + private var traceID = 0L + private var startTimestamp = 0L + private var parentID = 0L + private var initialTags = Map.empty[String, String] + + override def start(): io.opentracing.Span = + startManual() + + override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { + parent match { + case kamonSpanContext: kamon.trace.SpanContext => + traceID = kamonSpanContext.traceID + parentID = kamonSpanContext.spanID + case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") + } + this + } + + override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = { + parent.context() match { + case kamonSpanContext: kamon.trace.SpanContext => + traceID = kamonSpanContext.traceID + parentID = kamonSpanContext.spanID + case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") + } + this + } + + override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { + if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) { + referencedContext match { + case kamonSpanContext: kamon.trace.SpanContext => + traceID = kamonSpanContext.traceID + parentID = kamonSpanContext.spanID + case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") + } + } + this + } + + override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = { + initialTags = initialTags + (key -> value) + this + } + + override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = { + initialTags = initialTags + (key -> value.toString) + this + } + + override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = { + initialTags = initialTags + (key -> value.toString) + this + } + + override def startManual(): Span = { + if(traceID == 0L) traceID = Tracer.this.newTraceID + val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() + new Span(new SpanContext(traceID, spanID, parentID), operationName, startTimestampMicros, metrics, reporterRegistry) + } + + override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = { + startTimestamp = microseconds + this + } + + override def startActive(): io.opentracing.ActiveSpan = { + Tracer.this.makeActive(startManual()) + } + + override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = ??? + } + } |