aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace/Span.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-07-14 14:12:47 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-07-14 14:12:47 +0200
commit34010efc7b273e50d805a277646f14aa96aaa8b2 (patch)
tree8f7a6f00eac4e0a4cb60c9093b3c5d06ed982662 /kamon-core/src/main/scala/kamon/trace/Span.scala
parent52c4503b6aea2309feeb550b7db2e5fa627dedc8 (diff)
downloadKamon-34010efc7b273e50d805a277646f14aa96aaa8b2.tar.gz
Kamon-34010efc7b273e50d805a277646f14aa96aaa8b2.tar.bz2
Kamon-34010efc7b273e50d805a277646f14aa96aaa8b2.zip
wip
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace/Span.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala247
1 files changed, 135 insertions, 112 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 8149be74..a113b9bc 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -16,155 +16,178 @@
package kamon
package trace
+import kamon.trace.SpanContext.SamplingDecision
import scala.collection.JavaConverters._
import kamon.util.{Clock, MeasurementUnit}
-class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long,
- reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span {
+trait Span extends BaseSpan {
- private var isOpen: Boolean = true
- private val sampled: Boolean = spanContext.sampled
- private var operationName: String = initialOperationName
- private var endTimestampMicros: Long = 0
+ def annotate(name: String): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty))
- private var tags = initialTags
- private var logs = List.empty[Span.LogEntry]
- private var additionalMetricTags = Map.empty[String, String]
+ def annotate(name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, fields))
+ def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(timestampMicroseconds, name, fields))
- override def log(fields: java.util.Map[String, _]): Span =
- log(fields.asScala.asInstanceOf[Map[String, _]])
- def log(fields: Map[String, _]): Span = synchronized {
- if (sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs
- this
- }
+}
- def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, fields) :: logs
- this
- }
+trait BaseSpan {
- override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span =
- log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]])
+ def context(): SpanContext
- override def log(event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs
- this
- }
+ def capture(): Continuation
- override def log(timestampMicroseconds: Long, event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs
- this
- }
+ def annotate(annotation: Span.Annotation): Span
- override def log(eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs
- this
- }
+ def addSpanTag(key: String, value: String): Span
+
+ def addMetricTag(key: String, value: String): Span
+
+ def addBaggage(key: String, value: String): Span
+
+ def getBaggage(key: String): Option[String]
+
+ def setOperationName(name: String): Span
- override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs
- this
+ def disableMetricsCollection(): Span
+
+ def finish(): Unit
+
+ def finish(finishTimestampMicros: Long): Unit
+
+}
+
+object Span {
+
+ final class Empty(tracer: Tracer) extends Span {
+ override val context: SpanContext = SpanContext.EmptySpanContext
+ override def capture(): Continuation = Continuation.Default(this, tracer)
+
+ override def annotate(annotation: Annotation): Span = this
+ override def addSpanTag(key: String, value: String): Span = this
+ override def addMetricTag(key: String, value: String): Span = this
+ override def addBaggage(key: String, value: String): Span = this
+ override def getBaggage(key: String): Option[String] = None
+ override def setOperationName(name: String): Span = this
+ override def disableMetricsCollection(): Span = this
+ override def finish(): Unit = {}
+ override def finish(finishTimestampMicros: Long): Unit = {}
}
- override def getBaggageItem(key: String): String =
- spanContext.getBaggage(key)
+ object Empty {
+ def apply(tracer: Tracer): Empty = new Empty(tracer)
+ }
- override def context(): SpanContext =
- spanContext
+ /**
+ *
+ * @param spanContext
+ * @param initialOperationName
+ * @param initialTags
+ * @param startTimestampMicros
+ * @param reporterRegistry
+ */
+ final class Real(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String],
+ startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer) extends Span {
+
+ private var collectMetrics: Boolean = true
+ private var isOpen: Boolean = true
+ private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample
+ private var operationName: String = initialOperationName
+ private var endTimestampMicros: Long = 0
+
+ private var spanTags = initialTags
+ private var customMetricTags = Map.empty[String, String]
+ private var annotations = List.empty[Span.Annotation]
+
+ def annotate(annotation: Annotation): Span = synchronized {
+ if(sampled && isOpen)
+ annotations = annotation :: annotations
+ this
+ }
- override def setTag(key: String, value: String): Span = synchronized {
- if (isOpen) {
- extractMetricTag(key, value)
- if(sampled)
- tags = tags ++ Map(key -> value)
+ override def addSpanTag(key: String, value: String): Span = synchronized {
+ if(sampled && isOpen)
+ spanTags = spanTags + (key -> value)
+ this
}
- this
- }
- override def setTag(key: String, value: Boolean): Span = {
- if (isOpen) {
- val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addMetricTag(key: String, value: String): Span = synchronized {
+ if(sampled && isOpen && collectMetrics)
+ customMetricTags = customMetricTags + (key -> value)
+ this
}
- this
- }
- override def setTag(key: String, value: Number): Span = {
- if (isOpen) {
- val tagValue = String.valueOf(value)
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addBaggage(key: String, value: String): Span = {
+ spanContext.baggage.add(key, value)
+ this
}
- this
- }
- def setMetricTag(key: String, value: String): Span = synchronized {
- if (isOpen)
- additionalMetricTags = additionalMetricTags ++ Map(key -> value)
- this
- }
+ override def getBaggage(key: String): Option[String] =
+ spanContext.baggage.get(key)
- override def setBaggageItem(key: String, value: String): Span = synchronized {
- if (isOpen)
- spanContext.addBaggageItem(key, value)
- this
- }
+ override def disableMetricsCollection(): Span = synchronized {
+ collectMetrics = false
+ this
+ }
- override def setOperationName(operationName: String): Span = synchronized {
- if(isOpen)
- this.operationName = operationName
- this
- }
+ override def context(): SpanContext =
+ spanContext
+
+ override def setOperationName(operationName: String): Span = synchronized {
+ if(isOpen)
+ this.operationName = operationName
+ this
+ }
- private def extractMetricTag(tag: String, value: String): Unit =
- if(tag.startsWith(Span.MetricTagPrefix))
- additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value)
+ override def finish(): Unit =
+ finish(Clock.microTimestamp())
- override def finish(): Unit =
- finish(Clock.microTimestamp())
+ override def finish(finishMicros: Long): Unit = synchronized {
+ if (isOpen) {
+ isOpen = false
+ endTimestampMicros = finishMicros
- override def finish(finishMicros: Long): Unit = synchronized {
- if (isOpen) {
- isOpen = false
- endTimestampMicros = finishMicros
- recordSpanMetrics()
+ if(collectMetrics)
+ recordSpanMetrics()
- if(sampled)
- reporterRegistry.reportSpan(completedSpan)
+ if(sampled)
+ reporterRegistry.reportSpan(completedSpan)
+ }
}
- }
- private def completedSpan: Span.CompletedSpan =
- Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs)
+ override def capture(): Continuation =
+ Continuation.Default(this, tracer)
+
+ private def completedSpan: Span.FinishedSpan =
+ Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations)
- private def recordSpanMetrics(): Unit = {
- val elapsedTime = endTimestampMicros - startTimestampMicros
- val metricTags = Map("operation" -> operationName) ++ additionalMetricTags
+ private def recordSpanMetrics(): Unit = {
+ val elapsedTime = endTimestampMicros - startTimestampMicros
+ val metricTags = Map("operation" -> operationName) ++ customMetricTags
- val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags)
- latencyHistogram.record(elapsedTime)
+ val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags)
+ latencyHistogram.record(elapsedTime)
- tags.get("error").foreach { errorTag =>
- if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) {
- Span.Metrics.SpanErrorCount.refine(metricTags).increment()
+ spanTags.get("error").foreach { errorTag =>
+ if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) {
+ Span.Metrics.SpanErrorCount.refine(metricTags).increment()
+ }
}
}
}
-}
-object Span {
+ object Real {
+ def apply(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String],
+ startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer): Real =
+ new Real(spanContext, initialOperationName, initialTags, startTimestampMicros, reporterRegistry, tracer)
+ }
+
+
+
object Metrics {
val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds)
val SpanErrorCount = Kamon.counter("span.error-count")
@@ -174,14 +197,14 @@ object Span {
val BooleanTagTrueValue = "1"
val BooleanTagFalseValue = "0"
- case class LogEntry(timestamp: Long, fields: Map[String, _])
+ case class Annotation(timestamp: Long, name: String, fields: Map[String, String])
- case class CompletedSpan(
+ case class FinishedSpan(
context: SpanContext,
operationName: String,
startTimestampMicros: Long,
endTimestampMicros: Long,
tags: Map[String, String],
- logs: Seq[LogEntry]
+ annotations: Seq[Annotation]
)
} \ No newline at end of file