aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace/Span.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace/Span.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala290
1 files changed, 167 insertions, 123 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 464559e3..a4424a45 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -16,176 +16,220 @@
package kamon
package trace
-
-import scala.collection.JavaConverters._
+import kamon.ReporterRegistry.SpanSink
+import kamon.context.Key
+import kamon.trace.SpanContext.SamplingDecision
import kamon.util.{Clock, MeasurementUnit}
-class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long,
- reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span {
- private var isOpen: Boolean = true
- private val sampled: Boolean = spanContext.sampled
- private var operationName: String = initialOperationName
- private var endTimestampMicros: Long = 0
+trait Span {
- private var tags = initialTags
- private var logs = List.empty[Span.LogEntry]
- private var additionalMetricTags = Map.empty[String, String]
+ def isEmpty(): Boolean
+ def isLocal(): Boolean
+ def nonEmpty(): Boolean = !isEmpty()
+ def isRemote(): Boolean = !isLocal()
- override def log(fields: java.util.Map[String, _]): Span =
- log(fields.asScala.asInstanceOf[Map[String, _]])
+ def context(): SpanContext
- def log(fields: Map[String, _]): Span = synchronized {
- if (sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs
- this
- }
+ def annotate(annotation: Span.Annotation): Span
- def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, fields) :: logs
- this
- }
+ def addSpanTag(key: String, value: String): Span
- override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span =
- log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]])
+ def addSpanTag(key: String, value: Long): Span
- override def log(event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs
- this
- }
+ def addSpanTag(key: String, value: Boolean): Span
- override def log(timestampMicroseconds: Long, event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs
- this
- }
+ def addMetricTag(key: String, value: String): Span
- override def log(eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs
- this
- }
+ def setOperationName(name: String): Span
+
+ def disableMetricsCollection(): Span
+
+ def finish(finishTimestampMicros: Long): Unit
+
+ def finish(): Unit =
+ finish(Clock.microTimestamp())
+
+ def annotate(name: String): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty))
- override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs
- this
+ def annotate(name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, fields))
+
+ def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(timestampMicroseconds, name, fields))
+
+}
+
+object Span {
+
+ val ContextKey = Key.broadcast[Span]("span", Span.Empty)
+
+ object Empty extends Span {
+ override val context: SpanContext = SpanContext.EmptySpanContext
+ override def isEmpty(): Boolean = true
+ override def isLocal(): Boolean = true
+ override def annotate(annotation: Annotation): Span = this
+ override def addSpanTag(key: String, value: String): Span = this
+ override def addSpanTag(key: String, value: Long): Span = this
+ override def addSpanTag(key: String, value: Boolean): Span = this
+ override def addMetricTag(key: String, value: String): Span = this
+ override def setOperationName(name: String): Span = this
+ override def disableMetricsCollection(): Span = this
+ override def finish(finishTimestampMicros: Long): Unit = {}
}
- override def getBaggageItem(key: String): String =
- spanContext.getBaggage(key)
+ /**
+ *
+ * @param spanContext
+ * @param initialOperationName
+ * @param initialSpanTags
+ * @param startTimestampMicros
+ * @param spanSink
+ */
+ final class Local(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink) extends Span {
+
+ private var collectMetrics: Boolean = true
+ private var open: Boolean = true
+ private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample
+ private var operationName: String = initialOperationName
+
+ private var spanTags: Map[String, Span.TagValue] = initialSpanTags
+ private var customMetricTags = initialMetricTags
+ private var annotations = List.empty[Span.Annotation]
+
+ override def isEmpty(): Boolean = false
+ override def isLocal(): Boolean = true
+
+ def annotate(annotation: Annotation): Span = synchronized {
+ if(sampled && open)
+ annotations = annotation :: annotations
+ this
+ }
- override def context(): SpanContext =
- spanContext
+ override def addSpanTag(key: String, value: String): Span = synchronized {
+ if(sampled && open)
+ spanTags = spanTags + (key -> TagValue.String(value))
+ this
+ }
- override def setTag(key: String, value: String): Span = synchronized {
- if (isOpen) {
- extractMetricTag(key, value)
- if(sampled)
- tags = tags ++ Map(key -> value)
+ override def addSpanTag(key: String, value: Long): Span = synchronized {
+ if(sampled && open)
+ spanTags = spanTags + (key -> TagValue.Number(value))
+ this
}
- this
- }
- override def setTag(key: String, value: Boolean): Span = {
- if (isOpen) {
- val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addSpanTag(key: String, value: Boolean): Span = synchronized {
+ if(sampled && open) {
+ val tagValue = if (value) TagValue.True else TagValue.False
+ spanTags = spanTags + (key -> tagValue)
+ }
+ this
}
- this
- }
- override def setTag(key: String, value: Number): Span = {
- if (isOpen) {
- val tagValue = String.valueOf(value)
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addMetricTag(key: String, value: String): Span = synchronized {
+ if(sampled && open && collectMetrics)
+ customMetricTags = customMetricTags + (key -> value)
+ this
}
- this
- }
- def setMetricTag(key: String, value: String): Span = synchronized {
- if (isOpen)
- additionalMetricTags = additionalMetricTags ++ Map(key -> value)
- this
- }
+ override def disableMetricsCollection(): Span = synchronized {
+ collectMetrics = false
+ this
+ }
- override def setBaggageItem(key: String, value: String): Span = synchronized {
- if (isOpen)
- spanContext.addBaggageItem(key, value)
- this
- }
+ override def context(): SpanContext =
+ spanContext
- override def setOperationName(operationName: String): Span = synchronized {
- if(isOpen)
- this.operationName = operationName
- this
- }
+ override def setOperationName(operationName: String): Span = synchronized {
+ if(open)
+ this.operationName = operationName
+ this
+ }
- private def extractMetricTag(tag: String, value: String): Unit =
- if(tag.startsWith(Span.MetricTagPrefix))
- additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value)
+ override def finish(finishMicros: Long): Unit = synchronized {
+ if (open) {
+ open = false
- override def finish(): Unit =
- finish(Clock.microTimestamp())
+ if(collectMetrics)
+ recordSpanMetrics(finishMicros)
+
+ if(sampled)
+ spanSink.reportSpan(toFinishedSpan(finishMicros))
+ }
+ }
+
+ private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan =
+ Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations)
+
+ private def recordSpanMetrics(endTimestampMicros: Long): Unit = {
+ val elapsedTime = endTimestampMicros - startTimestampMicros
+ val metricTags = Map("operation" -> operationName) ++ customMetricTags
- override def finish(finishMicros: Long): Unit = synchronized {
- if (isOpen) {
- isOpen = false
- endTimestampMicros = finishMicros
- recordSpanMetrics()
+ val isError = spanTags.get("error").exists {
+ errorTag => errorTag != null && errorTag.equals(Span.TagValue.True)
+ }
- if(sampled)
- reporterRegistry.reportSpan(completedSpan)
+ val refinedMetricTags = if(isError)
+ metricTags + ("error" -> "true")
+ else
+ metricTags
+
+ val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedMetricTags)
+ latencyHistogram.record(elapsedTime)
}
}
- private def completedSpan: Span.CompletedSpan =
- Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs)
+ object Local {
+ def apply(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl): Local =
+ new Local(spanContext, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry)
+ }
- private def recordSpanMetrics(): Unit = {
- val elapsedTime = endTimestampMicros - startTimestampMicros
- val metricTags = Map("operation" -> operationName) ++ additionalMetricTags
- val isError = tags.get("error").exists {
- errorTag => errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)
- }
+ final class Remote(val context: SpanContext) extends Span {
+ override def isEmpty(): Boolean = false
+ override def isLocal(): Boolean = false
+ override def annotate(annotation: Annotation): Span = this
+ override def addSpanTag(key: String, value: String): Span = this
+ override def addSpanTag(key: String, value: Long): Span = this
+ override def addSpanTag(key: String, value: Boolean): Span = this
+ override def addMetricTag(key: String, value: String): Span = this
+ override def setOperationName(name: String): Span = this
+ override def disableMetricsCollection(): Span = this
+ override def finish(finishTimestampMicros: Long): Unit = {}
+ }
- val refinedTags = if(isError) {
- metricTags + ("error" -> Span.BooleanTagTrueValue)
- } else {
- metricTags
- }
+ object Remote {
+ def apply(spanContext: SpanContext): Remote =
+ new Remote(spanContext)
+ }
+
+ sealed trait TagValue
+ object TagValue {
+ sealed trait Boolean extends TagValue
+ case object True extends Boolean
+ case object False extends Boolean
- val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedTags)
- latencyHistogram.record(elapsedTime)
+ case class String(string: java.lang.String) extends TagValue
+ case class Number(number: Long) extends TagValue
}
-}
-object Span {
object Metrics {
val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds)
val SpanErrorCount = Kamon.counter("span.error-count")
}
- val MetricTagPrefix = "metric."
- val BooleanTagTrueValue = "1"
- val BooleanTagFalseValue = "0"
-
- case class LogEntry(timestamp: Long, fields: Map[String, _])
+ case class Annotation(timestampMicros: Long, name: String, fields: Map[String, String])
- case class CompletedSpan(
+ case class FinishedSpan(
context: SpanContext,
operationName: String,
startTimestampMicros: Long,
endTimestampMicros: Long,
- tags: Map[String, String],
- logs: Seq[LogEntry]
+ tags: Map[String, Span.TagValue],
+ annotations: Seq[Annotation]
)
} \ No newline at end of file