diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-15 00:06:26 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-15 00:06:26 +0200 |
commit | 6721d325d018756296213ac8f9129bc304a21afb (patch) | |
tree | e08a5ce92802f521be228beae0ddb4ef258d0066 /kamon-core/src/main/scala/kamon/trace/Span.scala | |
parent | ac3b72e27765ceec4cc3958b0fa7eaba0299f017 (diff) | |
parent | a949c875684d78818224cd2ca7aaf79aa7878724 (diff) | |
download | Kamon-6721d325d018756296213ac8f9129bc304a21afb.tar.gz Kamon-6721d325d018756296213ac8f9129bc304a21afb.tar.bz2 Kamon-6721d325d018756296213ac8f9129bc304a21afb.zip |
Merge remote-tracking branch 'ivantopo/wip/moving-ot-support-to-a-separeate-project' into kamon-1.0-develop
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace/Span.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 290 |
1 files changed, 167 insertions, 123 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 464559e3..a4424a45 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -16,176 +16,220 @@ package kamon package trace - -import scala.collection.JavaConverters._ +import kamon.ReporterRegistry.SpanSink +import kamon.context.Key +import kamon.trace.SpanContext.SamplingDecision import kamon.util.{Clock, MeasurementUnit} -class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long, - reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { - private var isOpen: Boolean = true - private val sampled: Boolean = spanContext.sampled - private var operationName: String = initialOperationName - private var endTimestampMicros: Long = 0 +trait Span { - private var tags = initialTags - private var logs = List.empty[Span.LogEntry] - private var additionalMetricTags = Map.empty[String, String] + def isEmpty(): Boolean + def isLocal(): Boolean + def nonEmpty(): Boolean = !isEmpty() + def isRemote(): Boolean = !isLocal() - override def log(fields: java.util.Map[String, _]): Span = - log(fields.asScala.asInstanceOf[Map[String, _]]) + def context(): SpanContext - def log(fields: Map[String, _]): Span = synchronized { - if (sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs - this - } + def annotate(annotation: Span.Annotation): Span - def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, fields) :: logs - this - } + def addSpanTag(key: String, value: String): Span - override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = - log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + def addSpanTag(key: String, value: Long): Span - override def log(event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs - this - } + def addSpanTag(key: String, value: Boolean): Span - override def log(timestampMicroseconds: Long, event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs - this - } + def addMetricTag(key: String, value: String): Span - override def log(eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs - this - } + def setOperationName(name: String): Span + + def disableMetricsCollection(): Span + + def finish(finishTimestampMicros: Long): Unit + + def finish(): Unit = + finish(Clock.microTimestamp()) + + def annotate(name: String): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty)) - override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs - this + def annotate(name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, fields)) + + def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(timestampMicroseconds, name, fields)) + +} + +object Span { + + val ContextKey = Key.broadcast[Span]("span", Span.Empty) + + object Empty extends Span { + override val context: SpanContext = SpanContext.EmptySpanContext + override def isEmpty(): Boolean = true + override def isLocal(): Boolean = true + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addSpanTag(key: String, value: Long): Span = this + override def addSpanTag(key: String, value: Boolean): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(finishTimestampMicros: Long): Unit = {} } - override def getBaggageItem(key: String): String = - spanContext.getBaggage(key) + /** + * + * @param spanContext + * @param initialOperationName + * @param initialSpanTags + * @param startTimestampMicros + * @param spanSink + */ + final class Local(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink) extends Span { + + private var collectMetrics: Boolean = true + private var open: Boolean = true + private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample + private var operationName: String = initialOperationName + + private var spanTags: Map[String, Span.TagValue] = initialSpanTags + private var customMetricTags = initialMetricTags + private var annotations = List.empty[Span.Annotation] + + override def isEmpty(): Boolean = false + override def isLocal(): Boolean = true + + def annotate(annotation: Annotation): Span = synchronized { + if(sampled && open) + annotations = annotation :: annotations + this + } - override def context(): SpanContext = - spanContext + override def addSpanTag(key: String, value: String): Span = synchronized { + if(sampled && open) + spanTags = spanTags + (key -> TagValue.String(value)) + this + } - override def setTag(key: String, value: String): Span = synchronized { - if (isOpen) { - extractMetricTag(key, value) - if(sampled) - tags = tags ++ Map(key -> value) + override def addSpanTag(key: String, value: Long): Span = synchronized { + if(sampled && open) + spanTags = spanTags + (key -> TagValue.Number(value)) + this } - this - } - override def setTag(key: String, value: Boolean): Span = { - if (isOpen) { - val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addSpanTag(key: String, value: Boolean): Span = synchronized { + if(sampled && open) { + val tagValue = if (value) TagValue.True else TagValue.False + spanTags = spanTags + (key -> tagValue) + } + this } - this - } - override def setTag(key: String, value: Number): Span = { - if (isOpen) { - val tagValue = String.valueOf(value) - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addMetricTag(key: String, value: String): Span = synchronized { + if(sampled && open && collectMetrics) + customMetricTags = customMetricTags + (key -> value) + this } - this - } - def setMetricTag(key: String, value: String): Span = synchronized { - if (isOpen) - additionalMetricTags = additionalMetricTags ++ Map(key -> value) - this - } + override def disableMetricsCollection(): Span = synchronized { + collectMetrics = false + this + } - override def setBaggageItem(key: String, value: String): Span = synchronized { - if (isOpen) - spanContext.addBaggageItem(key, value) - this - } + override def context(): SpanContext = + spanContext - override def setOperationName(operationName: String): Span = synchronized { - if(isOpen) - this.operationName = operationName - this - } + override def setOperationName(operationName: String): Span = synchronized { + if(open) + this.operationName = operationName + this + } - private def extractMetricTag(tag: String, value: String): Unit = - if(tag.startsWith(Span.MetricTagPrefix)) - additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) + override def finish(finishMicros: Long): Unit = synchronized { + if (open) { + open = false - override def finish(): Unit = - finish(Clock.microTimestamp()) + if(collectMetrics) + recordSpanMetrics(finishMicros) + + if(sampled) + spanSink.reportSpan(toFinishedSpan(finishMicros)) + } + } + + private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan = + Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations) + + private def recordSpanMetrics(endTimestampMicros: Long): Unit = { + val elapsedTime = endTimestampMicros - startTimestampMicros + val metricTags = Map("operation" -> operationName) ++ customMetricTags - override def finish(finishMicros: Long): Unit = synchronized { - if (isOpen) { - isOpen = false - endTimestampMicros = finishMicros - recordSpanMetrics() + val isError = spanTags.get("error").exists { + errorTag => errorTag != null && errorTag.equals(Span.TagValue.True) + } - if(sampled) - reporterRegistry.reportSpan(completedSpan) + val refinedMetricTags = if(isError) + metricTags + ("error" -> "true") + else + metricTags + + val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedMetricTags) + latencyHistogram.record(elapsedTime) } } - private def completedSpan: Span.CompletedSpan = - Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs) + object Local { + def apply(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], + initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl): Local = + new Local(spanContext, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry) + } - private def recordSpanMetrics(): Unit = { - val elapsedTime = endTimestampMicros - startTimestampMicros - val metricTags = Map("operation" -> operationName) ++ additionalMetricTags - val isError = tags.get("error").exists { - errorTag => errorTag != null && errorTag.equals(Span.BooleanTagTrueValue) - } + final class Remote(val context: SpanContext) extends Span { + override def isEmpty(): Boolean = false + override def isLocal(): Boolean = false + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addSpanTag(key: String, value: Long): Span = this + override def addSpanTag(key: String, value: Boolean): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(finishTimestampMicros: Long): Unit = {} + } - val refinedTags = if(isError) { - metricTags + ("error" -> Span.BooleanTagTrueValue) - } else { - metricTags - } + object Remote { + def apply(spanContext: SpanContext): Remote = + new Remote(spanContext) + } + + sealed trait TagValue + object TagValue { + sealed trait Boolean extends TagValue + case object True extends Boolean + case object False extends Boolean - val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedTags) - latencyHistogram.record(elapsedTime) + case class String(string: java.lang.String) extends TagValue + case class Number(number: Long) extends TagValue } -} -object Span { object Metrics { val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds) val SpanErrorCount = Kamon.counter("span.error-count") } - val MetricTagPrefix = "metric." - val BooleanTagTrueValue = "1" - val BooleanTagFalseValue = "0" - - case class LogEntry(timestamp: Long, fields: Map[String, _]) + case class Annotation(timestampMicros: Long, name: String, fields: Map[String, String]) - case class CompletedSpan( + case class FinishedSpan( context: SpanContext, operationName: String, startTimestampMicros: Long, endTimestampMicros: Long, - tags: Map[String, String], - logs: Seq[LogEntry] + tags: Map[String, Span.TagValue], + annotations: Seq[Annotation] ) }
\ No newline at end of file |