aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-05-18 16:21:44 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-05-18 16:21:44 +0200
commit77f2666650726352a9e15dcf6019064d91393b2e (patch)
treec39f7e2a18ac6bb1fcd1e2cc73dd3c165919515e /kamon-core/src/main/scala/kamon/trace
parent5dee54a0794b282e9b5729a3d4b85478c12a68d1 (diff)
downloadKamon-77f2666650726352a9e15dcf6019064d91393b2e.tar.gz
Kamon-77f2666650726352a9e15dcf6019064d91393b2e.tar.bz2
Kamon-77f2666650726352a9e15dcf6019064d91393b2e.zip
some more wip
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala183
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanContext.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala109
3 files changed, 311 insertions, 2 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
new file mode 100644
index 00000000..87115e19
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -0,0 +1,183 @@
+package kamon
+package trace
+
+import kamon.metric.RecorderRegistry
+import kamon.metric.instrument.DynamicRange
+
+import scala.collection.JavaConverters._
+import kamon.util.{Clock, MeasurementUnit}
+
+object Span {
+ val MetricCategory = "span"
+ val LatencyMetricName = "elapsed-time"
+ val ErrorMetricName = "error"
+ val MetricTagPrefix = "metric."
+ val BooleanTagTrueValue = "1"
+ val BooleanTagFalseValue = "0"
+
+ case class LogEntry(timestamp: Long, fields: Map[String, _])
+
+ case class CompletedSpan(
+ context: SpanContext,
+ operationName: String,
+ startTimestampMicros: Long,
+ endTimestampMicros: Long,
+ tags: Map[String, String],
+ logs: Seq[LogEntry]
+ )
+}
+
+
+class Span(spanContext: SpanContext, initialOperationName: String, startTimestampMicros: Long,
+ recorderRegistry: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span {
+
+ private var isOpen: Boolean = true
+ private val isSampled: Boolean = true // TODO: User a proper sampler
+ private var operationName: String = initialOperationName
+ private var endTimestampMicros: Long = 0
+
+ private var logs = List.empty[Span.LogEntry]
+ private var tags = Map.empty[String, String]
+ private var metricTags = Map.empty[String, String]
+
+
+ override def log(fields: java.util.Map[String, _]): Span =
+ log(fields.asScala.asInstanceOf[Map[String, _]])
+
+ def log(fields: Map[String, _]): Span = synchronized {
+ if (isSampled && isOpen) {
+ logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs
+ }
+ this
+ }
+
+ override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span =
+ log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]])
+
+ def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized {
+ if(isSampled && isOpen) {
+ logs = Span.LogEntry(timestampMicroseconds, fields) :: logs
+ }
+ this
+ }
+
+ override def log(event: String): Span = synchronized {
+ if(isSampled && isOpen) {
+ logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs
+ }
+ this
+ }
+
+ override def log(timestampMicroseconds: Long, event: String): Span = synchronized {
+ if(isSampled && isOpen) {
+ logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs
+ }
+ this
+ }
+
+ override def log(eventName: String, payload: scala.Any): Span = synchronized {
+ if(isSampled && isOpen) {
+ logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs
+ }
+ this
+ }
+
+ override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized {
+ if(isSampled && isOpen) {
+ logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs
+ }
+ this
+ }
+
+ override def getBaggageItem(key: String): String =
+ spanContext.getBaggage(key)
+
+ override def context(): SpanContext =
+ spanContext
+
+ override def setTag(key: String, value: String): Span = synchronized {
+ if (isOpen) {
+ extractMetricTag(key, value)
+ if(isSampled)
+ tags = tags ++ Map(key -> value)
+ }
+ this
+ }
+
+ override def setTag(key: String, value: Boolean): Span = {
+ if (isOpen) {
+ val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue
+ extractMetricTag(key, tagValue)
+ if(isSampled)
+ tags = tags + (key -> tagValue)
+ }
+ this
+ }
+
+ override def setTag(key: String, value: Number): Span = {
+ if (isOpen) {
+ val tagValue = String.valueOf(value)
+ extractMetricTag(key, tagValue)
+ if(isSampled)
+ tags = tags + (key -> tagValue)
+ }
+ this
+ }
+
+ def setMetricTag(key: String, value: String): Span = synchronized {
+ if (isOpen) {
+ metricTags = metricTags ++ Map(key -> value)
+ }
+ this
+ }
+
+ override def setBaggageItem(key: String, value: String): Span = synchronized {
+ if (isOpen) {
+ spanContext.addBaggageItem(key, value)
+ }
+ this
+ }
+
+ override def setOperationName(operationName: String): Span = {
+ if(isOpen) {
+ this.operationName = operationName
+ }
+ this
+ }
+
+ private def extractMetricTag(tag: String, value: String): Unit = {
+ if(tag.startsWith(Span.MetricTagPrefix)) {
+ metricTags = metricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value)
+ }
+ }
+
+ override def finish(): Unit =
+ finish(Clock.microTimestamp())
+
+ override def finish(finishMicros: Long): Unit =
+ if(isOpen) {
+ isOpen = false
+ endTimestampMicros = finishMicros
+ recordSpanMetrics()
+ reporterRegistry.reportSpan(completedSpan)
+ }
+
+ private def completedSpan: Span.CompletedSpan =
+ Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs)
+
+ private def recordSpanMetrics(): Unit = {
+ val elapsedTime = endTimestampMicros - startTimestampMicros
+ val recorder = recorderRegistry.getRecorder(operationName, Span.MetricCategory, metricTags)
+
+ recorder
+ .histogram(Span.LatencyMetricName, MeasurementUnit.time.microseconds, DynamicRange.Default)
+ .record(elapsedTime)
+
+ tags.get("error").foreach { errorTag =>
+ if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) {
+ recorder.counter(Span.ErrorMetricName).increment()
+ }
+ }
+
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
new file mode 100644
index 00000000..7f5962e0
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
@@ -0,0 +1,21 @@
+package kamon.trace
+import java.lang
+import java.util.Map
+import scala.collection.JavaConverters._
+
+class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long) extends io.opentracing.SpanContext {
+ private var baggage = scala.collection.immutable.Map.empty[String, String]
+
+ private[kamon] def addBaggageItem(key: String, value: String): Unit = {
+ baggage = baggage + (key -> value)
+ }
+
+ private[kamon] def getBaggage(key: String): String =
+ baggage.get(key).getOrElse(null)
+
+ private[kamon] def baggageMap: scala.collection.immutable.Map[String, String] =
+ baggage
+
+ override def baggageItems(): lang.Iterable[Map.Entry[String, String]] =
+ baggage.asJava.entrySet()
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 802d95ec..84aafe68 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -1,5 +1,110 @@
package kamon.trace
-trait Tracer extends io.opentracing.Tracer {
- def sampler: Sampler
+import java.util.concurrent.atomic.AtomicLong
+
+import io.opentracing.propagation.Format
+import io.opentracing.util.ThreadLocalActiveSpanSource
+import kamon.ReporterRegistryImpl
+import kamon.metric.RecorderRegistry
+import kamon.util.Clock
+
+class Tracer(metrics: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Tracer {
+ private val traceCounter = new AtomicLong()
+ private val spanCounter = new AtomicLong()
+ private val activeSpanSource = new ThreadLocalActiveSpanSource()
+
+
+ override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder =
+ new SpanBuilder(operationName, spanCounter.incrementAndGet())
+
+ override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext =
+ sys.error("Extracting not implemented yet.")
+
+ override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit =
+ sys.error("Injecting not implemented yet.")
+
+ override def activeSpan(): io.opentracing.ActiveSpan =
+ activeSpanSource.activeSpan()
+
+ override def makeActive(span: io.opentracing.Span): io.opentracing.ActiveSpan =
+ activeSpanSource.makeActive(span)
+
+
+ private[kamon] def newTraceID: Long =
+ traceCounter.incrementAndGet()
+
+ private class SpanBuilder(operationName: String, spanID: Long) extends io.opentracing.Tracer.SpanBuilder {
+ private var traceID = 0L
+ private var startTimestamp = 0L
+ private var parentID = 0L
+ private var initialTags = Map.empty[String, String]
+
+ override def start(): io.opentracing.Span =
+ startManual()
+
+ override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = {
+ parent match {
+ case kamonSpanContext: kamon.trace.SpanContext =>
+ traceID = kamonSpanContext.traceID
+ parentID = kamonSpanContext.spanID
+ case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext")
+ }
+ this
+ }
+
+ override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = {
+ parent.context() match {
+ case kamonSpanContext: kamon.trace.SpanContext =>
+ traceID = kamonSpanContext.traceID
+ parentID = kamonSpanContext.spanID
+ case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext")
+ }
+ this
+ }
+
+ override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = {
+ if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) {
+ referencedContext match {
+ case kamonSpanContext: kamon.trace.SpanContext =>
+ traceID = kamonSpanContext.traceID
+ parentID = kamonSpanContext.spanID
+ case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext")
+ }
+ }
+ this
+ }
+
+ override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = {
+ initialTags = initialTags + (key -> value)
+ this
+ }
+
+ override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = {
+ initialTags = initialTags + (key -> value.toString)
+ this
+ }
+
+ override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = {
+ initialTags = initialTags + (key -> value.toString)
+ this
+ }
+
+ override def startManual(): Span = {
+ if(traceID == 0L) traceID = Tracer.this.newTraceID
+ val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp()
+ new Span(new SpanContext(traceID, spanID, parentID), operationName, startTimestampMicros, metrics, reporterRegistry)
+ }
+
+ override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = {
+ startTimestamp = microseconds
+ this
+ }
+
+ override def startActive(): io.opentracing.ActiveSpan = {
+ Tracer.this.makeActive(startManual())
+ }
+
+ override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = ???
+ }
+
}