From 77f2666650726352a9e15dcf6019064d91393b2e Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 18 May 2017 16:21:44 +0200 Subject: some more wip --- kamon-core/src/main/scala/kamon/Kamon.scala | 9 +- .../src/main/scala/kamon/ReporterRegistry.scala | 105 +++++------- kamon-core/src/main/scala/kamon/Util.scala | 6 +- .../main/scala/kamon/metric/RecorderRegistry.scala | 6 +- .../metric/instrument/InstrumentFactory.scala | 2 +- kamon-core/src/main/scala/kamon/trace/Span.scala | 183 +++++++++++++++++++++ .../src/main/scala/kamon/trace/SpanContext.scala | 21 +++ kamon-core/src/main/scala/kamon/trace/Tracer.scala | 109 +++++++++++- kamon-core/src/main/scala/kamon/util/Clock.scala | 19 ++- 9 files changed, 372 insertions(+), 88 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/trace/Span.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/SpanContext.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 316a9a24..317920f4 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,8 +1,7 @@ package kamon import com.typesafe.config.{Config, ConfigFactory} -import kamon.metric.instrument.Histogram -import kamon.metric.{Entity, EntityRecorder, RecorderRegistry, RecorderRegistryImpl} +import kamon.metric.{RecorderRegistry, RecorderRegistryImpl} import kamon.trace.Tracer /** @@ -15,7 +14,9 @@ import kamon.trace.Tracer object Kamon { private val recorderRegistry = new RecorderRegistryImpl(ConfigFactory.load()) private val reporterRegistry = new ReporterRegistryImpl(recorderRegistry, ConfigFactory.load()) + private val kamonTracer = new Tracer(recorderRegistry, reporterRegistry) + def tracer: io.opentracing.Tracer = kamonTracer def metrics: RecorderRegistry = recorderRegistry def reporters: ReporterRegistry = reporterRegistry @@ -24,13 +25,9 @@ object Kamon { reporterRegistry.reconfigure(config) } - - def tracer: Tracer = ??? def environment: Environment = ??? def diagnose: Diagnostic = ??? def util: Util = ??? - - } diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 09c980f6..b42c5abe 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -6,6 +6,7 @@ import java.util.concurrent._ import com.typesafe.config.Config import kamon.metric._ +import kamon.trace.Span import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -14,15 +15,40 @@ import scala.util.control.NonFatal trait ReporterRegistry { def loadFromConfig(): Unit + def add(reporter: MetricsReporter): Registration def add(reporter: MetricsReporter, name: String): Registration + def add(reporter: SpansReporter): Registration + def stopAll(): Future[Unit] } + +trait Registration { + def cancel(): Boolean +} + +trait MetricsReporter { + def start(config: Config): Unit + def reconfigure(config: Config): Unit + def stop(): Unit + + def reportTickSnapshot(snapshot: TickSnapshot) +} + +trait SpansReporter { + def start(config: Config): Unit + def reconfigure(config: Config): Unit + def stop(): Unit + + def reportSpan(span: Span.CompletedSpan): Unit +} + class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) extends ReporterRegistry { private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry")) private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]() private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]() + private val spanReporters = new ConcurrentLinkedQueue[SpansReporter]() private val reporterCounter = new AtomicLong(0L) reconfigure(initialConfig) @@ -55,6 +81,14 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) } } + override def add(reporter: SpansReporter): Registration = { + spanReporters.add(reporter) + + new Registration { + override def cancel(): Boolean = true + } + } + override def stopAll(): Future[Unit] = { implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) val reporterStopFutures = Vector.newBuilder[Future[Unit]] @@ -87,6 +121,11 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) } } + + private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { + spanReporters.forEach(_.reportSpan(span)) + } + private def stopReporter(entry: ReporterEntry): Future[Unit] = { entry.isActive = false @@ -131,7 +170,7 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) reporterEntries.forEach { entry => Future { if(entry.isActive) - entry.reporter.processTick(tickSnapshot) + entry.reporter.reportTickSnapshot(tickSnapshot) }(executor = entry.executionContext) } @@ -142,68 +181,4 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) case NonFatal(t) => logger.error("Error while running a tick", t) } } -} - - - -trait Registration { - def cancel(): Boolean -} - -trait MetricsReporter { - def reconfigure(config: Config): Unit - - def start(config: Config): Unit - def stop(): Unit - - def processTick(snapshot: TickSnapshot) -} - - - -object TestingAllExample extends App { - val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty)) - - val registration = Kamon.reporters.add(new DummyReporter("test")) - - var x = 0 - while(true) { - recorder.counter("test-other").increment() - Thread.sleep(100) - x += 1 - - if(x == 50) { - registration.cancel() - } - - if(x == 100) { - println("Stopping all reporters") - Kamon.reporters.stopAll() - } - } - -} - - -class DummyReporter(name: String) extends MetricsReporter { - override def reconfigure(config: Config): Unit = { - println("NAME: " + name + "===> Reconfiguring Dummy") - } - - override def start(config: Config): Unit = { - - println("NAME: " + name + "===> Starting DUMMY") - } - - override def stop(): Unit = { - println("NAME: " + name + "===> Stopping Dummy") - } - - override def processTick(snapshot: TickSnapshot): Unit = { - println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot) - println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}") - snapshot.entities.foreach { e => - println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", ")) - } - } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/Util.scala b/kamon-core/src/main/scala/kamon/Util.scala index 51282afc..04ce7a04 100644 --- a/kamon-core/src/main/scala/kamon/Util.scala +++ b/kamon-core/src/main/scala/kamon/Util.scala @@ -1,16 +1,12 @@ package kamon -import kamon.util.{Clock, EntityFilter} +import kamon.util.EntityFilter /** * Useful classes for Kamon and submodules. * */ trait Util { - /** - * @return The Clock instance used by Kamon for timestamps and latency measurements. - */ - def clock: Clock /** * @return Currently configured entity filters. diff --git a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala index 8b84ab6a..a4d2f4cd 100644 --- a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala @@ -23,11 +23,11 @@ class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry { reconfigure(initialConfig) - override def getRecorder(entity: Entity): EntityRecorder = { + override def getRecorder(entity: Entity): EntityRecorder = entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get())) - } - override def getRecorder(name: String, category: String, tags: Map[String, String]): EntityRecorder = ??? + override def getRecorder(name: String, category: String, tags: Map[String, String]): EntityRecorder = + getRecorder(Entity(name, category, tags)) override def removeRecorder(entity: Entity): Boolean = ??? diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala index 4f0502f0..e8d4d569 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala @@ -9,7 +9,7 @@ import kamon.metric.instrument.InstrumentFactory.CustomInstrumentSettings import kamon.util.MeasurementUnit -private[metric] class InstrumentFactory private ( +private[kamon] class InstrumentFactory private ( defaultHistogramDynamicRange: DynamicRange, defaultMMCounterDynamicRange: DynamicRange, defaultMMCounterSampleRate: Duration, diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala new file mode 100644 index 00000000..87115e19 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -0,0 +1,183 @@ +package kamon +package trace + +import kamon.metric.RecorderRegistry +import kamon.metric.instrument.DynamicRange + +import scala.collection.JavaConverters._ +import kamon.util.{Clock, MeasurementUnit} + +object Span { + val MetricCategory = "span" + val LatencyMetricName = "elapsed-time" + val ErrorMetricName = "error" + val MetricTagPrefix = "metric." + val BooleanTagTrueValue = "1" + val BooleanTagFalseValue = "0" + + case class LogEntry(timestamp: Long, fields: Map[String, _]) + + case class CompletedSpan( + context: SpanContext, + operationName: String, + startTimestampMicros: Long, + endTimestampMicros: Long, + tags: Map[String, String], + logs: Seq[LogEntry] + ) +} + + +class Span(spanContext: SpanContext, initialOperationName: String, startTimestampMicros: Long, + recorderRegistry: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { + + private var isOpen: Boolean = true + private val isSampled: Boolean = true // TODO: User a proper sampler + private var operationName: String = initialOperationName + private var endTimestampMicros: Long = 0 + + private var logs = List.empty[Span.LogEntry] + private var tags = Map.empty[String, String] + private var metricTags = Map.empty[String, String] + + + override def log(fields: java.util.Map[String, _]): Span = + log(fields.asScala.asInstanceOf[Map[String, _]]) + + def log(fields: Map[String, _]): Span = synchronized { + if (isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = + log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + + def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, fields) :: logs + } + this + } + + override def log(event: String): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, event: String): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs + } + this + } + + override def log(eventName: String, payload: scala.Any): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs + } + this + } + + override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { + if(isSampled && isOpen) { + logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs + } + this + } + + override def getBaggageItem(key: String): String = + spanContext.getBaggage(key) + + override def context(): SpanContext = + spanContext + + override def setTag(key: String, value: String): Span = synchronized { + if (isOpen) { + extractMetricTag(key, value) + if(isSampled) + tags = tags ++ Map(key -> value) + } + this + } + + override def setTag(key: String, value: Boolean): Span = { + if (isOpen) { + val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue + extractMetricTag(key, tagValue) + if(isSampled) + tags = tags + (key -> tagValue) + } + this + } + + override def setTag(key: String, value: Number): Span = { + if (isOpen) { + val tagValue = String.valueOf(value) + extractMetricTag(key, tagValue) + if(isSampled) + tags = tags + (key -> tagValue) + } + this + } + + def setMetricTag(key: String, value: String): Span = synchronized { + if (isOpen) { + metricTags = metricTags ++ Map(key -> value) + } + this + } + + override def setBaggageItem(key: String, value: String): Span = synchronized { + if (isOpen) { + spanContext.addBaggageItem(key, value) + } + this + } + + override def setOperationName(operationName: String): Span = { + if(isOpen) { + this.operationName = operationName + } + this + } + + private def extractMetricTag(tag: String, value: String): Unit = { + if(tag.startsWith(Span.MetricTagPrefix)) { + metricTags = metricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) + } + } + + override def finish(): Unit = + finish(Clock.microTimestamp()) + + override def finish(finishMicros: Long): Unit = + if(isOpen) { + isOpen = false + endTimestampMicros = finishMicros + recordSpanMetrics() + reporterRegistry.reportSpan(completedSpan) + } + + private def completedSpan: Span.CompletedSpan = + Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs) + + private def recordSpanMetrics(): Unit = { + val elapsedTime = endTimestampMicros - startTimestampMicros + val recorder = recorderRegistry.getRecorder(operationName, Span.MetricCategory, metricTags) + + recorder + .histogram(Span.LatencyMetricName, MeasurementUnit.time.microseconds, DynamicRange.Default) + .record(elapsedTime) + + tags.get("error").foreach { errorTag => + if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) { + recorder.counter(Span.ErrorMetricName).increment() + } + } + + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala new file mode 100644 index 00000000..7f5962e0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala @@ -0,0 +1,21 @@ +package kamon.trace +import java.lang +import java.util.Map +import scala.collection.JavaConverters._ + +class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long) extends io.opentracing.SpanContext { + private var baggage = scala.collection.immutable.Map.empty[String, String] + + private[kamon] def addBaggageItem(key: String, value: String): Unit = { + baggage = baggage + (key -> value) + } + + private[kamon] def getBaggage(key: String): String = + baggage.get(key).getOrElse(null) + + private[kamon] def baggageMap: scala.collection.immutable.Map[String, String] = + baggage + + override def baggageItems(): lang.Iterable[Map.Entry[String, String]] = + baggage.asJava.entrySet() +} diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 802d95ec..84aafe68 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -1,5 +1,110 @@ package kamon.trace -trait Tracer extends io.opentracing.Tracer { - def sampler: Sampler +import java.util.concurrent.atomic.AtomicLong + +import io.opentracing.propagation.Format +import io.opentracing.util.ThreadLocalActiveSpanSource +import kamon.ReporterRegistryImpl +import kamon.metric.RecorderRegistry +import kamon.util.Clock + +class Tracer(metrics: RecorderRegistry, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Tracer { + private val traceCounter = new AtomicLong() + private val spanCounter = new AtomicLong() + private val activeSpanSource = new ThreadLocalActiveSpanSource() + + + override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = + new SpanBuilder(operationName, spanCounter.incrementAndGet()) + + override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = + sys.error("Extracting not implemented yet.") + + override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = + sys.error("Injecting not implemented yet.") + + override def activeSpan(): io.opentracing.ActiveSpan = + activeSpanSource.activeSpan() + + override def makeActive(span: io.opentracing.Span): io.opentracing.ActiveSpan = + activeSpanSource.makeActive(span) + + + private[kamon] def newTraceID: Long = + traceCounter.incrementAndGet() + + private class SpanBuilder(operationName: String, spanID: Long) extends io.opentracing.Tracer.SpanBuilder { + private var traceID = 0L + private var startTimestamp = 0L + private var parentID = 0L + private var initialTags = Map.empty[String, String] + + override def start(): io.opentracing.Span = + startManual() + + override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { + parent match { + case kamonSpanContext: kamon.trace.SpanContext => + traceID = kamonSpanContext.traceID + parentID = kamonSpanContext.spanID + case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") + } + this + } + + override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = { + parent.context() match { + case kamonSpanContext: kamon.trace.SpanContext => + traceID = kamonSpanContext.traceID + parentID = kamonSpanContext.spanID + case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") + } + this + } + + override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { + if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) { + referencedContext match { + case kamonSpanContext: kamon.trace.SpanContext => + traceID = kamonSpanContext.traceID + parentID = kamonSpanContext.spanID + case _ => sys.error("Can't extract the parent ID from a non-kamon SpanContext") + } + } + this + } + + override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = { + initialTags = initialTags + (key -> value) + this + } + + override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = { + initialTags = initialTags + (key -> value.toString) + this + } + + override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = { + initialTags = initialTags + (key -> value.toString) + this + } + + override def startManual(): Span = { + if(traceID == 0L) traceID = Tracer.this.newTraceID + val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() + new Span(new SpanContext(traceID, spanID, parentID), operationName, startTimestampMicros, metrics, reporterRegistry) + } + + override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = { + startTimestamp = microseconds + this + } + + override def startActive(): io.opentracing.ActiveSpan = { + Tracer.this.makeActive(startManual()) + } + + override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = ??? + } + } diff --git a/kamon-core/src/main/scala/kamon/util/Clock.scala b/kamon-core/src/main/scala/kamon/util/Clock.scala index 55bb529a..b0b47a2b 100644 --- a/kamon-core/src/main/scala/kamon/util/Clock.scala +++ b/kamon-core/src/main/scala/kamon/util/Clock.scala @@ -1,9 +1,16 @@ package kamon.util -trait Clock { - def nanoTimestamp(): Long - def microTimestamp(): Long - def milliTimestamp(): Long +object Clock { + private val startTimeMillis = System.currentTimeMillis() + private val startNanoTime = System.nanoTime() + private val startMicroTime = startTimeMillis * 1000L - def relativeNanoTimestamp(): Long -} + def microTimestamp(): Long = + startMicroTime + ((System.nanoTime() - startNanoTime) / 1000L) + + def milliTimestamp(): Long = + System.currentTimeMillis() + + def relativeNanoTimestamp(): Long = + System.nanoTime() +} \ No newline at end of file -- cgit v1.2.3