aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-core/src/main/resources/reference.conf32
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala55
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricLookup.scala59
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala86
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Registry.scala222
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala74
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala42
10 files changed, 294 insertions, 313 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index f4d180ca..fd1c88c3 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -1,10 +1,14 @@
kamon {
environment {
- application = ""
- host = ""
- instance = ""
+ host = "auto"
+ instance = "auto"
+ application = "kamon-application"
}
+ # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`.
+ # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`.
+ reporters = []
+
metric {
tick-interval = 60 seconds
@@ -21,10 +25,6 @@ kamon {
}
- # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`.
- # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`.
- reporters = []
-
# Thread pool size used by the metrics refresh scheduler. This pool is only used to periodically sampling
# min-max-counter values.
refresh-scheduler-pool-size = 2
@@ -74,4 +74,22 @@ kamon {
}
}
}
+
+ trace {
+ # Configures a sample that decides which traces should be reported to the trace backends. The possible values are:
+ # - always: report all traces.
+ # - never: don't report any trace.
+ # - random: use the random tracer.
+ #
+ sampler = "random"
+
+ # The random sampler uses the "chance" setting and a random number to take a decision, if the random number is
+ # on the upper (chance * 100) percent of the number spectrum the trace will be sampled. E.g. a chance of 0.01 will
+ # hint that 1% of all traces should be reported.
+ sampler-random {
+
+ # Chance of a span being sampled. Must be a value between 0 and 1.
+ chance = 0.01
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 60956ee0..468308f2 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -1,52 +1,65 @@
package kamon
+import java.time.Duration
+import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.{Config, ConfigFactory}
-import kamon.metric.instrument.{DynamicRange, Histogram}
-import kamon.metric.{MetricLookup, Registry}
+import kamon.metric.instrument._
+import kamon.metric.{MetricLookup, MetricRegistry}
import kamon.trace.Tracer
-import kamon.util.MeasurementUnit
+import kamon.util.{HexCodec, MeasurementUnit}
object Kamon extends MetricLookup {
- private val _initialConfig = ConfigFactory.load()
- //private val _recorderRegistry = new RecorderRegistryImpl(_initialConfig)
- private val _reporterRegistry = new ReporterRegistryImpl(???, _initialConfig)
- private val _tracer = new Tracer(???, _reporterRegistry)
- private val _environment = new AtomicReference[Environment](environmentFromConfig(ConfigFactory.load()))
+ private val initialConfig = ConfigFactory.load()
+ private val incarnation = HexCodec.toLowerHex(ThreadLocalRandom.current().nextLong())
- def tracer: io.opentracing.Tracer =
- _tracer
+ private val metricRegistry = new MetricRegistry(initialConfig)
+ private val reporterRegistry = new ReporterRegistryImpl(metricRegistry, initialConfig)
+ private val trazer = new Tracer(Kamon, reporterRegistry)
+ private val env = new AtomicReference[Environment](environmentFromConfig(ConfigFactory.load()))
-// def metrics: RecorderRegistry =
-// _recorderRegistry
+ def tracer: io.opentracing.Tracer =
+ trazer
def reporters: ReporterRegistry =
- _reporterRegistry
+ reporterRegistry
def environment: Environment =
- _environment.get()
+ env.get()
def reconfigure(config: Config): Unit = synchronized {
- // _recorderRegistry.reconfigure(config)
- _reporterRegistry.reconfigure(config)
- _environment.set(environmentFromConfig(config))
+ metricRegistry.reconfigure(config)
+ reporterRegistry.reconfigure(config)
+ env.set(environmentFromConfig(config))
}
- private val metricRegistry = new Registry(_initialConfig)
- override def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange]): Histogram =
+ override def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange:
+ Option[DynamicRange]): Histogram =
metricRegistry.histogram(name, unit, tags, dynamicRange)
- case class Environment(config: Config, application: String, host: String, instance: String)
+ override def counter(name: String, unit: MeasurementUnit, tags: Map[String, String]): Counter =
+ metricRegistry.counter(name, unit, tags)
+
+ override def gauge(name: String, unit: MeasurementUnit, tags: Map[String, String]): Gauge =
+ metricRegistry.gauge(name, unit, tags)
+
+ override def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], sampleInterval: Option[Duration],
+ dynamicRange: Option[DynamicRange]): MinMaxCounter =
+ metricRegistry.minMaxCounter(name, unit, tags, dynamicRange, sampleInterval)
+
+
+ case class Environment(config: Config, application: String, host: String, instance: String, incarnation: String)
private def environmentFromConfig(config: Config): Environment = {
val environmentConfig = config.getConfig("kamon.environment")
+
val application = environmentConfig.getString("application")
val host = environmentConfig.getString("host")
val instance = environmentConfig.getString("instance")
- Environment(config, application, host, instance)
+ Environment(config, application, host, instance, incarnation)
}
}
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 11312512..3b59d8b7 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -5,9 +5,9 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._
import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
import kamon.metric._
import kamon.trace.Span
-import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.util.Try
@@ -29,22 +29,22 @@ trait Registration {
}
trait MetricsReporter {
- def start(config: Config): Unit
- def reconfigure(config: Config): Unit
+ def start(): Unit
def stop(): Unit
- def reportTickSnapshot(snapshot: TickSnapshot)
+ def reconfigure(config: Config): Unit
+ def reportTickSnapshot(snapshot: TickSnapshot): Unit
}
trait SpansReporter {
- def start(config: Config): Unit
- def reconfigure(config: Config): Unit
+ def start(): Unit
def stop(): Unit
+ def reconfigure(config: Config): Unit
def reportSpan(span: Span.CompletedSpan): Unit
}
-class ReporterRegistryImpl(metrics: RegistrySnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
+class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry"))
private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]()
@@ -141,8 +141,8 @@ class ReporterRegistryImpl(metrics: RegistrySnapshotGenerator, initialConfig: Co
executionContext: ExecutionContextExecutorService
)
- private class MetricTicker(snapshotGenerator: RegistrySnapshotGenerator, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
- val logger = LoggerFactory.getLogger(classOf[MetricTicker])
+ private class MetricTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
+ val logger = Logger(classOf[MetricTicker])
var lastTick = Instant.now()
def run(): Unit = try {
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala b/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala
new file mode 100644
index 00000000..db33b83c
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala
@@ -0,0 +1,59 @@
+package kamon
+package metric
+
+import java.time.Duration
+
+import kamon.metric.instrument._
+import kamon.util.MeasurementUnit
+
+trait MetricLookup {
+
+ def histogram(name: String): Histogram =
+ histogram(name, MeasurementUnit.none, Map.empty[String, String], None)
+
+ def histogram(name: String, unit: MeasurementUnit): Histogram =
+ histogram(name, unit, Map.empty[String, String], None)
+
+ def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String]): Histogram =
+ histogram(name, unit, tags, None)
+
+ def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: DynamicRange): Histogram =
+ histogram(name, unit, tags, Some(dynamicRange))
+
+ def counter(name: String): Counter =
+ counter(name, MeasurementUnit.none, Map.empty[String, String])
+
+ def counter(name: String, unit: MeasurementUnit): Counter =
+ counter(name, unit, Map.empty[String, String])
+
+ def gauge(name: String): Gauge =
+ gauge(name, MeasurementUnit.none, Map.empty[String, String])
+
+ def gauge(name: String, unit: MeasurementUnit): Gauge =
+ gauge(name, unit, Map.empty[String, String])
+
+ def minMaxCounter(name: String): MinMaxCounter =
+ minMaxCounter(name, MeasurementUnit.none, Map.empty[String, String], None, None)
+
+ def minMaxCounter(name: String, unit: MeasurementUnit): MinMaxCounter =
+ minMaxCounter(name, unit, Map.empty[String, String], None, None)
+
+ def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String]): MinMaxCounter =
+ minMaxCounter(name, unit, tags, None, None)
+
+ def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], sampleInterval: Duration): MinMaxCounter =
+ minMaxCounter(name, unit, tags, Option(sampleInterval), None)
+
+ def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], sampleInterval: Duration,
+ dynamicRange: DynamicRange): MinMaxCounter =
+ minMaxCounter(name, unit, tags, Option(sampleInterval), Option(dynamicRange))
+
+ def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange]): Histogram
+
+ def counter(name: String, unit: MeasurementUnit, tags: Map[String, String]): Counter
+
+ def gauge(name: String, unit: MeasurementUnit, tags: Map[String, String]): Gauge
+
+ def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], sampleInterval: Option[Duration],
+ dynamicRange: Option[DynamicRange]): MinMaxCounter
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
new file mode 100644
index 00000000..c6513f1a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
@@ -0,0 +1,86 @@
+package kamon
+package metric
+
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicReference
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import kamon.metric.instrument._
+import kamon.util.MeasurementUnit
+
+import scala.collection.concurrent.TrieMap
+
+
+class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator {
+ private val logger = Logger(classOf[MetricRegistry])
+ private val metrics = TrieMap.empty[String, MetricEntry]
+ private val instrumentFactory = new AtomicReference[InstrumentFactory]()
+ reconfigure(initialConfig)
+
+ def reconfigure(config: Config): Unit = synchronized {
+ instrumentFactory.set(InstrumentFactory.fromConfig(config))
+ }
+
+ def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange]): Histogram =
+ lookupInstrument(name, unit, tags, InstrumentType.Histogram, instrumentFactory.get().buildHistogram(dynamicRange))
+
+ def counter(name: String, unit: MeasurementUnit, tags: Map[String, String]): Counter =
+ lookupInstrument(name, unit, tags, InstrumentType.Counter, instrumentFactory.get().buildCounter)
+
+ def gauge(name: String, unit: MeasurementUnit, tags: Map[String, String]): Gauge =
+ lookupInstrument(name, unit, tags, InstrumentType.Gauge, instrumentFactory.get().buildGauge)
+
+ def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]): MinMaxCounter =
+ lookupInstrument(name, unit, tags, InstrumentType.MinMaxCounter, instrumentFactory.get().buildMinMaxCounter(dynamicRange, sampleInterval))
+
+
+ override def snapshot(): MetricsSnapshot = synchronized {
+ var histograms = Seq.empty[DistributionSnapshot]
+ var mmCounters = Seq.empty[DistributionSnapshot]
+ var counters = Seq.empty[SingleValueSnapshot]
+ var gauges = Seq.empty[SingleValueSnapshot]
+
+ for {
+ metricEntry <- metrics.values
+ instrument <- metricEntry.instruments.values
+ } {
+ metricEntry.instrumentType match {
+ case InstrumentType.Histogram => histograms = histograms :+ instrument.asInstanceOf[SnapshotableHistogram].snapshot()
+ case InstrumentType.MinMaxCounter => mmCounters = mmCounters :+ instrument.asInstanceOf[SnapshotableMinMaxCounter].snapshot()
+ case InstrumentType.Gauge => gauges = gauges :+ instrument.asInstanceOf[SnapshotableGauge].snapshot()
+ case InstrumentType.Counter => counters = counters :+ instrument.asInstanceOf[SnapshotableCounter].snapshot()
+ }
+ }
+
+ MetricsSnapshot(histograms, mmCounters, gauges, counters)
+ }
+
+ private def lookupInstrument[T](name: String, measurementUnit: MeasurementUnit, tags: Map[String, String],
+ instrumentType: InstrumentType, builder: (String, Map[String, String], MeasurementUnit) => T): T = {
+
+ val entry = metrics.atomicGetOrElseUpdate(name, MetricEntry(instrumentType, measurementUnit, TrieMap.empty))
+ if(entry.instrumentType != instrumentType)
+ sys.error(s"Tried to use metric [$name] as a [${instrumentType.name}] but it is already defined as [${entry.instrumentType.name}] ")
+
+ if(entry.unit != measurementUnit)
+ logger.warn("Ignoring attempt to use measurement unit [{}] on metric [name={}, tags={}], the metric uses [{}]",
+ measurementUnit.magnitude.name, name, tags.prettyPrint(), entry.unit.magnitude.name)
+
+ entry.instruments.getOrElseUpdate(tags, builder(name, tags, measurementUnit)).asInstanceOf[T]
+ }
+
+ private case class InstrumentType(name: String)
+ private object InstrumentType {
+ val Histogram = InstrumentType("Histogram")
+ val MinMaxCounter = InstrumentType("MinMaxCounter")
+ val Counter = InstrumentType("Counter")
+ val Gauge = InstrumentType("Gauge")
+ }
+
+ private case class MetricEntry(instrumentType: InstrumentType, unit: MeasurementUnit, instruments: TrieMap[Map[String, String], Any])
+}
+
+trait MetricsSnapshotGenerator {
+ def snapshot(): MetricsSnapshot
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Registry.scala b/kamon-core/src/main/scala/kamon/metric/Registry.scala
deleted file mode 100644
index 3f549802..00000000
--- a/kamon-core/src/main/scala/kamon/metric/Registry.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-package kamon
-package metric
-
-import java.time.Duration
-import java.util.concurrent.ScheduledThreadPoolExecutor
-import java.util.concurrent.atomic.AtomicReference
-
-import com.typesafe.config.{Config, ConfigFactory}
-import com.typesafe.scalalogging.Logger
-import kamon.metric.instrument._
-import kamon.util.MeasurementUnit
-
-import scala.collection.concurrent.TrieMap
-/*
-
-
-Kamon.metrics.histogram("http.latency").withMeasurementUnit(Time.Microseconds)
-
-
-Histogram.create("http.latency", Time.Milliseconds)
-
-
-
-val histogram = Histogram.builder("http.latency")
- .tag("method", "get")
- .build()
-
-
-val actorMetrics = MetricGroup("method" -> "get")
-
-
-val actorMetrics = MetricGroup.builder()
- .tag("method", "get")
- .build()
-
-actorMetrics.histogram(
-
-Options for a Histogram:
- - MeasurementUnit
- - Dynamic Range
-
-HistogramConfig.forLatency().inMicroseconds()
-
-Kamon.metrics.histogram("http.latency").withoutTags()
-Kamon.metrics.histogram("http.latency").withTag("method", "get")
-
-
-
-
-Kamon.metrics.histogram("http.latency", Tag.of("color", "blue"), Tag.of("color", "blue"))
-
-Kamon.histogram(named("http.latency").withTag("path", path))
-Kamon.counter(named("http.latency").withTag("path", path))
-
-
-
-
-
-
-
-
-val group = Kamon.metrics.group(tags = Map("path" -> "/my-system/user/test-actor"))
-val processingTime = group.histogram("processing-time")
-
-
-
- def histogram(name: String): Histogram =
- histogram(name, MeasurementUnit.none)
-
- def histogram(name: String, unit: MeasurementUnit): Histogram =
- histogram(name, unit, Map.empty)
-
- def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String]): Histogram =
- histogram(name, unit, tags, DynamicRange.Default)
-
-
-
- */
-
-trait MetricLookup {
-
- def histogram(name: String): Histogram =
- histogram(name, MeasurementUnit.none)
-
- def histogram(name: String, unit: MeasurementUnit): Histogram =
- histogram(name, unit, Map.empty)
-
- def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String]): Histogram =
- histogram(name, unit, tags, None)
-
- def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: DynamicRange): Histogram =
- histogram(name, unit, tags, Some(dynamicRange))
-
- def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange]): Histogram
-
-}
-
-class Registry(initialConfig: Config) extends RegistrySnapshotGenerator {
- private val logger = Logger(classOf[Registry])
- private val metrics = TrieMap.empty[String, MetricEntry]
- private val instrumentFactory = new AtomicReference[InstrumentFactory]()
- reconfigure(initialConfig)
-
- def reconfigure(config: Config): Unit = synchronized {
- instrumentFactory.set(InstrumentFactory.fromConfig(config))
- }
-
- def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange]): Histogram =
- lookupInstrument(name, unit, tags, InstrumentType.Histogram, instrumentFactory.get().buildHistogram(dynamicRange))
-
- def counter(name: String, unit: MeasurementUnit, tags: Map[String, String]): Counter =
- lookupInstrument(name, unit, tags, InstrumentType.Counter, instrumentFactory.get().buildCounter)
-
- def gauge(name: String, unit: MeasurementUnit, tags: Map[String, String]): Gauge =
- lookupInstrument(name, unit, tags, InstrumentType.Gauge, instrumentFactory.get().buildGauge)
-
- def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]): MinMaxCounter =
- lookupInstrument(name, unit, tags, InstrumentType.MinMaxCounter, instrumentFactory.get().buildMinMaxCounter(dynamicRange, sampleInterval))
-
-
- override def snapshot(): RegistrySnapshot = synchronized {
- var histograms = Seq.empty[DistributionSnapshot]
- var mmCounters = Seq.empty[DistributionSnapshot]
- var counters = Seq.empty[SingleValueSnapshot]
- var gauges = Seq.empty[SingleValueSnapshot]
-
- for {
- metricEntry <- metrics.values
- instrument <- metricEntry.instruments.values
- } {
- metricEntry.instrumentType match {
- case InstrumentType.Histogram => histograms = histograms :+ instrument.asInstanceOf[SnapshotableHistogram].snapshot()
- case InstrumentType.MinMaxCounter => mmCounters = mmCounters :+ instrument.asInstanceOf[SnapshotableMinMaxCounter].snapshot()
- case InstrumentType.Gauge => gauges = gauges :+ instrument.asInstanceOf[SnapshotableGauge].snapshot()
- case InstrumentType.Counter => counters = counters :+ instrument.asInstanceOf[SnapshotableCounter].snapshot()
- }
- }
-
- RegistrySnapshot(histograms, mmCounters, gauges, counters)
- }
-
- private def lookupInstrument[T](name: String, measurementUnit: MeasurementUnit, tags: Map[String, String],
- instrumentType: InstrumentType, builder: (String, Map[String, String], MeasurementUnit) => T): T = {
-
- val entry = metrics.atomicGetOrElseUpdate(name, MetricEntry(instrumentType, measurementUnit, TrieMap.empty))
- if(entry.instrumentType != instrumentType)
- sys.error(s"Tried to use metric [$name] as a [${instrumentType.name}] but it is already defined as [${entry.instrumentType.name}] ")
-
- if(entry.unit != measurementUnit)
- logger.warn("Ignoring attempt to use measurement unit [{}] on metric [name={}, tags={}], the metric uses [{}]",
- measurementUnit.magnitude.name, name, tags.prettyPrint(), entry.unit.magnitude.name)
-
- entry.instruments.getOrElseUpdate(tags, builder(name, tags, measurementUnit)).asInstanceOf[T]
- }
-
- private case class InstrumentType(name: String)
- private object InstrumentType {
- val Histogram = InstrumentType("Histogram")
- val MinMaxCounter = InstrumentType("MinMaxCounter")
- val Counter = InstrumentType("Counter")
- val Gauge = InstrumentType("Gauge")
- }
-
- private case class MetricEntry(instrumentType: InstrumentType, unit: MeasurementUnit, instruments: TrieMap[Map[String, String], Any])
-}
-
-
-
-//
-//
-//trait RecorderRegistry {
-// def shouldTrack(entity: Entity): Boolean
-// def getRecorder(entity: Entity): EntityRecorder
-// def removeRecorder(entity: Entity): Boolean
-//}
-//
-//class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry {
-// private val scheduler = new ScheduledThreadPoolExecutor(1, numberedThreadFactory("kamon.metric.refresh-scheduler"))
-// private val instrumentFactory = new AtomicReference[InstrumentFactory]()
-// private val entityFilter = new AtomicReference[Filter]()
-// private val entities = TrieMap.empty[Entity, EntityRecorder with EntitySnapshotProducer]
-//
-// reconfigure(initialConfig)
-//
-//
-// override def shouldTrack(entity: Entity): Boolean =
-// entityFilter.get().accept(entity)
-//
-// override def getRecorder(entity: Entity): EntityRecorder =
-// entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get(), scheduler))
-//
-// override def removeRecorder(entity: Entity): Boolean =
-// entities.remove(entity).nonEmpty
-//
-// private[kamon] def reconfigure(config: Config): Unit = synchronized {
-// instrumentFactory.set(InstrumentFactory.fromConfig(config))
-// entityFilter.set(Filter.fromConfig(config))
-//
-// val refreshSchedulerPoolSize = config.getInt("kamon.metric.refresh-scheduler-pool-size")
-// scheduler.setCorePoolSize(refreshSchedulerPoolSize)
-// }
-//
-// //private[kamon] def diagnosticData
-//}
-//
-//case class RecorderRegistryDiagnostic(entities: Seq[Entity])
-//
-
-
-object Test extends App {
- val registry = new Registry(ConfigFactory.load())
-
- println(registry.histogram("test-1", MeasurementUnit.none, Map.empty, Some(DynamicRange.Default)).dynamicRange)
- println(registry.histogram("test-2", MeasurementUnit.none, Map.empty, Option(DynamicRange.Fine)).dynamicRange)
-
- println(Kamon.histogram("my-test"))
-}
-
-
-
-
-
diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
index fe027c91..b7cc349e 100644
--- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
@@ -6,12 +6,13 @@ import kamon.metric.instrument.{DistributionSnapshot, SingleValueSnapshot}
case class Interval(from: Instant, to: Instant)
-case class RegistrySnapshot(histograms: Seq[DistributionSnapshot], minMaxCounters: Seq[DistributionSnapshot],
- gauges: Seq[SingleValueSnapshot], counters: Seq[SingleValueSnapshot])
+case class MetricsSnapshot(
+ histograms: Seq[DistributionSnapshot],
+ minMaxCounters: Seq[DistributionSnapshot],
+ gauges: Seq[SingleValueSnapshot],
+ counters: Seq[SingleValueSnapshot]
+)
-case class TickSnapshot(interval: Interval, metrics: RegistrySnapshot)
+case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot)
-trait RegistrySnapshotGenerator {
- def snapshot(): RegistrySnapshot
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
index 491cf358..3cb55e51 100644
--- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -20,6 +20,9 @@ object Sampler {
class Constant(decision: Boolean) extends Sampler {
override def decide(spanID: Long): Boolean = decision
+
+ override def toString: String =
+ s"Sampler.Constant(decision = $decision)"
}
class Random(chance: Double) extends Sampler {
@@ -28,5 +31,8 @@ object Sampler {
override def decide(spanID: Long): Boolean =
spanID >= lowerBoundary && spanID <= upperBoundary
+
+ override def toString: String =
+ s"Sampler.Random(chance = $chance)"
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index e64d8118..635f9545 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -1,32 +1,13 @@
package kamon
package trace
-import scala.collection.JavaConverters._
-import kamon.util.Clock
-
-object Span {
- val MetricCategory = "span"
- val LatencyMetricName = "elapsed-time"
- val ErrorMetricName = "error"
- val MetricTagPrefix = "metric."
- val BooleanTagTrueValue = "1"
- val BooleanTagFalseValue = "0"
-
- case class LogEntry(timestamp: Long, fields: Map[String, _])
-
- case class CompletedSpan(
- context: SpanContext,
- operationName: String,
- startTimestampMicros: Long,
- endTimestampMicros: Long,
- tags: Map[String, String],
- logs: Seq[LogEntry]
- )
-}
+import kamon.metric.MetricLookup
+import scala.collection.JavaConverters._
+import kamon.util.{Clock, MeasurementUnit}
class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long,
- recorderRegistry: Any, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span {
+ metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span {
private var isOpen: Boolean = true
private val sampled: Boolean = spanContext.sampled
@@ -35,7 +16,7 @@ class Span(spanContext: SpanContext, initialOperationName: String, initialTags:
private var tags = initialTags
private var logs = List.empty[Span.LogEntry]
- private var metricTags = Map.empty[String, String]
+ private var additionalMetricTags = Map.empty[String, String]
override def log(fields: java.util.Map[String, _]): Span =
@@ -117,7 +98,7 @@ class Span(spanContext: SpanContext, initialOperationName: String, initialTags:
def setMetricTag(key: String, value: String): Span = synchronized {
if (isOpen)
- metricTags = metricTags ++ Map(key -> value)
+ additionalMetricTags = additionalMetricTags ++ Map(key -> value)
this
}
@@ -135,7 +116,7 @@ class Span(spanContext: SpanContext, initialOperationName: String, initialTags:
private def extractMetricTag(tag: String, value: String): Unit =
if(tag.startsWith(Span.MetricTagPrefix))
- metricTags = metricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value)
+ additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value)
override def finish(): Unit =
finish(Clock.microTimestamp())
@@ -153,17 +134,32 @@ class Span(spanContext: SpanContext, initialOperationName: String, initialTags:
private def recordSpanMetrics(): Unit = {
val elapsedTime = endTimestampMicros - startTimestampMicros
-// val entity = Entity(operationName, Span.MetricCategory, metricTags)
-// val recorder = recorderRegistry.getRecorder(entity)
-
-// recorder
-// .histogram(Span.LatencyMetricName, MeasurementUnit.time.microseconds, DynamicRange.Default)
-// .record(elapsedTime)
-//
-// tags.get("error").foreach { errorTag =>
-// if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) {
-// recorder.counter(Span.ErrorMetricName).increment()
-// }
-// }
+ val metricTags = Map("operation" -> operationName) ++ additionalMetricTags
+
+ val latencyHistogram = metrics.histogram("span.processing-time", MeasurementUnit.time.microseconds, metricTags)
+ latencyHistogram.record(elapsedTime)
+
+ tags.get("error").foreach { errorTag =>
+ if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) {
+ metrics.counter("span.errors", MeasurementUnit.none, metricTags).increment()
+ }
+ }
}
-} \ No newline at end of file
+}
+
+object Span {
+ val MetricTagPrefix = "metric."
+ val BooleanTagTrueValue = "1"
+ val BooleanTagFalseValue = "0"
+
+ case class LogEntry(timestamp: Long, fields: Map[String, _])
+
+ case class CompletedSpan(
+ context: SpanContext,
+ operationName: String,
+ startTimestampMicros: Long,
+ endTimestampMicros: Long,
+ tags: Map[String, String],
+ logs: Seq[LogEntry]
+ )
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index ed42b810..22e19ebc 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -2,19 +2,24 @@ package kamon.trace
import java.util.concurrent.ThreadLocalRandom
+import com.typesafe.config.Config
import com.typesafe.scalalogging.Logger
-import io.opentracing.propagation.{TextMap, Format}
+import io.opentracing.propagation.{Format, TextMap}
import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP}
import io.opentracing.util.ThreadLocalActiveSpanSource
import kamon.ReporterRegistryImpl
+import kamon.metric.MetricLookup
import kamon.util.Clock
-class Tracer(metrics: Any, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Tracer {
+
+
+
+class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Tracer {
private val logger = Logger(classOf[Tracer])
- ///private val metricsRecorder = new TracerMetricsRecorder(metrics.getRecorder(Entity("tracer", "tracer", Map.empty)))
+ private val tracerMetrics = new TracerMetrics(metrics)
private val activeSpanSource = new ThreadLocalActiveSpanSource()
- @volatile private var sampler: Sampler = Sampler.never
+ @volatile private var configuredSampler: Sampler = Sampler.never
@volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap
@volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3
@@ -22,8 +27,8 @@ class Tracer(metrics: Any, reporterRegistry: ReporterRegistryImpl) extends io.op
new SpanBuilder(operationName)
override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match {
- case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], sampler)
- case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], sampler)
+ case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler)
+ case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler)
case BINARY => null // TODO: Implement Binary Encoding
case _ => null
}
@@ -35,6 +40,9 @@ class Tracer(metrics: Any, reporterRegistry: ReporterRegistryImpl) extends io.op
case _ =>
}
+ def sampler: Sampler =
+ configuredSampler
+
override def activeSpan(): io.opentracing.ActiveSpan =
activeSpanSource.activeSpan()
@@ -114,15 +122,31 @@ class Tracer(metrics: Any, reporterRegistry: ReporterRegistryImpl) extends io.op
new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, initialTags)
else {
val traceID = createID()
- new SpanContext(traceID, traceID, 0L, sampler.decide(traceID), initialTags)
+ new SpanContext(traceID, traceID, 0L, configuredSampler.decide(traceID), initialTags)
}
- //metricsRecorder.createdSpans.increment()
- new Span(spanContext, operationName, initialTags, startTimestampMicros, ???, reporterRegistry)
+ tracerMetrics.createdSpans.increment()
+ new Span(spanContext, operationName, initialTags, startTimestampMicros, metrics, reporterRegistry)
}
private def createID(): Long =
ThreadLocalRandom.current().nextLong()
}
+
+ private[kamon] def reconfigure(config: Config): Unit = synchronized {
+ val traceConfig = config.getConfig("kamon.trace")
+
+ configuredSampler = traceConfig.getString("sampler") match {
+ case "always" => Sampler.always
+ case "never" => Sampler.never
+ case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance"))
+ case other => sys.error(s"Unexpected sampler name $other.")
+ }
+ }
+
+ private final class TracerMetrics(metricLookup: MetricLookup) {
+ val createdSpans = metricLookup.counter("tracer.spans-created")
+ }
+
}