diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-05-18 16:21:44 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-05-18 16:21:44 +0200 |
commit | 77f2666650726352a9e15dcf6019064d91393b2e (patch) | |
tree | c39f7e2a18ac6bb1fcd1e2cc73dd3c165919515e /kamon-core/src/main/scala/kamon/trace/Span.scala | |
parent | 5dee54a0794b282e9b5729a3d4b85478c12a68d1 (diff) | |
download | Kamon-77f2666650726352a9e15dcf6019064d91393b2e.tar.gz Kamon-77f2666650726352a9e15dcf6019064d91393b2e.tar.bz2 Kamon-77f2666650726352a9e15dcf6019064d91393b2e.zip |
some more wip
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace/Span.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 183 |
1 files changed, 183 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala new file mode 100644 index 00000000..87115e19 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -0,0 +1,183 @@ +package kamon +package trace + +import kamon.metric.RecorderRegistry +import kamon.metric.instrument.DynamicRange + +import scala.collection.JavaConverters._ +import kamon.util.{Clock, MeasurementUnit} + +object Span { + val MetricCategory = "span" + val LatencyMetricName = "elapsed-time" + val ErrorMetricName = "error" + val MetricTagPrefix = "metric." + val BooleanTagTrueValue = "1" + val BooleanTagFalseValue = "0" + + case class LogEntry(timestamp: Long, fields: Map[String, _]) + + case class CompletedSpan( + context: SpanContext, + operationName: String, + startTimestampMicros: Long, + endTimestampMicros: Long, + tags: Map[String, String], + logs: Seq[LogEntry] + ) +} + + +class Span(spanContext: SpanContext, initialOperationName: String, startTimestampMicros: Long, + recorderRegistry: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { + + private var isOpen: Boolean = true + private val isSampled: Boolean = true // TODO: User a proper sampler + private var operationName: String = initialOperationName + private var endTimestampMicros: Long = 0 + + private var logs = List.empty[Span.LogEntry] + private var tags = Map.empty[String, String] + private var metricTags = Map.empty[String, String] + + + override def log(fields: java.util.Map[String, _]): Span = + log(fields.asScala.asInstanceOf[Map[String, _]]) + + def log(fields: Map[String, _]): Span = synchronized { + if (isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = + log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + + def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, fields) :: logs + } + this + } + + override def log(event: String): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, event: String): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs + } + this + } + + override def log(eventName: String, payload: scala.Any): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs + } + this + } + + override def getBaggageItem(key: String): String = + spanContext.getBaggage(key) + + override def context(): SpanContext = + spanContext + + override def setTag(key: String, value: String): Span = synchronized { + if (isOpen) { + extractMetricTag(key, value) + if(isSampled) + tags = tags ++ Map(key -> value) + } + this + } + + override def setTag(key: String, value: Boolean): Span = { + if (isOpen) { + val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue + extractMetricTag(key, tagValue) + if(isSampled) + tags = tags + (key -> tagValue) + } + this + } + + override def setTag(key: String, value: Number): Span = { + if (isOpen) { + val tagValue = String.valueOf(value) + extractMetricTag(key, tagValue) + if(isSampled) + tags = tags + (key -> tagValue) + } + this + } + + def setMetricTag(key: String, value: String): Span = synchronized { + if (isOpen) { + metricTags = metricTags ++ Map(key -> value) + } + this + } + + override def setBaggageItem(key: String, value: String): Span = synchronized { + if (isOpen) { + spanContext.addBaggageItem(key, value) + } + this + } + + override def setOperationName(operationName: String): Span = { + if(isOpen) { + this.operationName = operationName + } + this + } + + private def extractMetricTag(tag: String, value: String): Unit = { + if(tag.startsWith(Span.MetricTagPrefix)) { + metricTags = metricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) + } + } + + override def finish(): Unit = + finish(Clock.microTimestamp()) + + override def finish(finishMicros: Long): Unit = + if(isOpen) { + isOpen = false + endTimestampMicros = finishMicros + recordSpanMetrics() + reporterRegistry.reportSpan(completedSpan) + } + + private def completedSpan: Span.CompletedSpan = + Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs) + + private def recordSpanMetrics(): Unit = { + val elapsedTime = endTimestampMicros - startTimestampMicros + val recorder = recorderRegistry.getRecorder(operationName, Span.MetricCategory, metricTags) + + recorder + .histogram(Span.LatencyMetricName, MeasurementUnit.time.microseconds, DynamicRange.Default) + .record(elapsedTime) + + tags.get("error").foreach { errorTag => + if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) { + recorder.counter(Span.ErrorMetricName).increment() + } + } + + } +}
\ No newline at end of file |