diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-07-14 14:12:47 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-07-14 14:12:47 +0200 |
commit | 34010efc7b273e50d805a277646f14aa96aaa8b2 (patch) | |
tree | 8f7a6f00eac4e0a4cb60c9093b3c5d06ed982662 /kamon-core/src/main/scala/kamon/trace/Span.scala | |
parent | 52c4503b6aea2309feeb550b7db2e5fa627dedc8 (diff) | |
download | Kamon-34010efc7b273e50d805a277646f14aa96aaa8b2.tar.gz Kamon-34010efc7b273e50d805a277646f14aa96aaa8b2.tar.bz2 Kamon-34010efc7b273e50d805a277646f14aa96aaa8b2.zip |
wip
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace/Span.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 247 |
1 files changed, 135 insertions, 112 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 8149be74..a113b9bc 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -16,155 +16,178 @@ package kamon package trace +import kamon.trace.SpanContext.SamplingDecision import scala.collection.JavaConverters._ import kamon.util.{Clock, MeasurementUnit} -class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long, - reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { +trait Span extends BaseSpan { - private var isOpen: Boolean = true - private val sampled: Boolean = spanContext.sampled - private var operationName: String = initialOperationName - private var endTimestampMicros: Long = 0 + def annotate(name: String): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty)) - private var tags = initialTags - private var logs = List.empty[Span.LogEntry] - private var additionalMetricTags = Map.empty[String, String] + def annotate(name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, fields)) + def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(timestampMicroseconds, name, fields)) - override def log(fields: java.util.Map[String, _]): Span = - log(fields.asScala.asInstanceOf[Map[String, _]]) - def log(fields: Map[String, _]): Span = synchronized { - if (sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs - this - } +} - def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, fields) :: logs - this - } +trait BaseSpan { - override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = - log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + def context(): SpanContext - override def log(event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs - this - } + def capture(): Continuation - override def log(timestampMicroseconds: Long, event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs - this - } + def annotate(annotation: Span.Annotation): Span - override def log(eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs - this - } + def addSpanTag(key: String, value: String): Span + + def addMetricTag(key: String, value: String): Span + + def addBaggage(key: String, value: String): Span + + def getBaggage(key: String): Option[String] + + def setOperationName(name: String): Span - override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs - this + def disableMetricsCollection(): Span + + def finish(): Unit + + def finish(finishTimestampMicros: Long): Unit + +} + +object Span { + + final class Empty(tracer: Tracer) extends Span { + override val context: SpanContext = SpanContext.EmptySpanContext + override def capture(): Continuation = Continuation.Default(this, tracer) + + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def addBaggage(key: String, value: String): Span = this + override def getBaggage(key: String): Option[String] = None + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(): Unit = {} + override def finish(finishTimestampMicros: Long): Unit = {} } - override def getBaggageItem(key: String): String = - spanContext.getBaggage(key) + object Empty { + def apply(tracer: Tracer): Empty = new Empty(tracer) + } - override def context(): SpanContext = - spanContext + /** + * + * @param spanContext + * @param initialOperationName + * @param initialTags + * @param startTimestampMicros + * @param reporterRegistry + */ + final class Real(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], + startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer) extends Span { + + private var collectMetrics: Boolean = true + private var isOpen: Boolean = true + private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample + private var operationName: String = initialOperationName + private var endTimestampMicros: Long = 0 + + private var spanTags = initialTags + private var customMetricTags = Map.empty[String, String] + private var annotations = List.empty[Span.Annotation] + + def annotate(annotation: Annotation): Span = synchronized { + if(sampled && isOpen) + annotations = annotation :: annotations + this + } - override def setTag(key: String, value: String): Span = synchronized { - if (isOpen) { - extractMetricTag(key, value) - if(sampled) - tags = tags ++ Map(key -> value) + override def addSpanTag(key: String, value: String): Span = synchronized { + if(sampled && isOpen) + spanTags = spanTags + (key -> value) + this } - this - } - override def setTag(key: String, value: Boolean): Span = { - if (isOpen) { - val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addMetricTag(key: String, value: String): Span = synchronized { + if(sampled && isOpen && collectMetrics) + customMetricTags = customMetricTags + (key -> value) + this } - this - } - override def setTag(key: String, value: Number): Span = { - if (isOpen) { - val tagValue = String.valueOf(value) - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addBaggage(key: String, value: String): Span = { + spanContext.baggage.add(key, value) + this } - this - } - def setMetricTag(key: String, value: String): Span = synchronized { - if (isOpen) - additionalMetricTags = additionalMetricTags ++ Map(key -> value) - this - } + override def getBaggage(key: String): Option[String] = + spanContext.baggage.get(key) - override def setBaggageItem(key: String, value: String): Span = synchronized { - if (isOpen) - spanContext.addBaggageItem(key, value) - this - } + override def disableMetricsCollection(): Span = synchronized { + collectMetrics = false + this + } - override def setOperationName(operationName: String): Span = synchronized { - if(isOpen) - this.operationName = operationName - this - } + override def context(): SpanContext = + spanContext + + override def setOperationName(operationName: String): Span = synchronized { + if(isOpen) + this.operationName = operationName + this + } - private def extractMetricTag(tag: String, value: String): Unit = - if(tag.startsWith(Span.MetricTagPrefix)) - additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) + override def finish(): Unit = + finish(Clock.microTimestamp()) - override def finish(): Unit = - finish(Clock.microTimestamp()) + override def finish(finishMicros: Long): Unit = synchronized { + if (isOpen) { + isOpen = false + endTimestampMicros = finishMicros - override def finish(finishMicros: Long): Unit = synchronized { - if (isOpen) { - isOpen = false - endTimestampMicros = finishMicros - recordSpanMetrics() + if(collectMetrics) + recordSpanMetrics() - if(sampled) - reporterRegistry.reportSpan(completedSpan) + if(sampled) + reporterRegistry.reportSpan(completedSpan) + } } - } - private def completedSpan: Span.CompletedSpan = - Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs) + override def capture(): Continuation = + Continuation.Default(this, tracer) + + private def completedSpan: Span.FinishedSpan = + Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations) - private def recordSpanMetrics(): Unit = { - val elapsedTime = endTimestampMicros - startTimestampMicros - val metricTags = Map("operation" -> operationName) ++ additionalMetricTags + private def recordSpanMetrics(): Unit = { + val elapsedTime = endTimestampMicros - startTimestampMicros + val metricTags = Map("operation" -> operationName) ++ customMetricTags - val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags) - latencyHistogram.record(elapsedTime) + val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags) + latencyHistogram.record(elapsedTime) - tags.get("error").foreach { errorTag => - if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) { - Span.Metrics.SpanErrorCount.refine(metricTags).increment() + spanTags.get("error").foreach { errorTag => + if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) { + Span.Metrics.SpanErrorCount.refine(metricTags).increment() + } } } } -} -object Span { + object Real { + def apply(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], + startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer): Real = + new Real(spanContext, initialOperationName, initialTags, startTimestampMicros, reporterRegistry, tracer) + } + + + object Metrics { val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds) val SpanErrorCount = Kamon.counter("span.error-count") @@ -174,14 +197,14 @@ object Span { val BooleanTagTrueValue = "1" val BooleanTagFalseValue = "0" - case class LogEntry(timestamp: Long, fields: Map[String, _]) + case class Annotation(timestamp: Long, name: String, fields: Map[String, String]) - case class CompletedSpan( + case class FinishedSpan( context: SpanContext, operationName: String, startTimestampMicros: Long, endTimestampMicros: Long, tags: Map[String, String], - logs: Seq[LogEntry] + annotations: Seq[Annotation] ) }
\ No newline at end of file |