diff options
Diffstat (limited to 'kamon-core/src/main')
-rw-r--r-- | kamon-core/src/main/scala/kamon/Environment.scala | 3 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/Kamon.scala | 13 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/Reporters.scala | 160 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/Subscriptions.scala | 12 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala | 48 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala | 14 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/MetricsSubscriber.scala | 12 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala (renamed from kamon-core/src/main/scala/kamon/metric/Metrics.scala) | 18 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala | 12 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala | 5 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala | 2 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/util/Scheduler.scala | 5 |
12 files changed, 249 insertions, 55 deletions
diff --git a/kamon-core/src/main/scala/kamon/Environment.scala b/kamon-core/src/main/scala/kamon/Environment.scala index 3184184a..5f728cbd 100644 --- a/kamon-core/src/main/scala/kamon/Environment.scala +++ b/kamon-core/src/main/scala/kamon/Environment.scala @@ -1,5 +1,7 @@ package kamon +import java.util.concurrent.ScheduledExecutorService + import com.typesafe.config.Config trait Environment { @@ -7,4 +9,5 @@ trait Environment { def host: String def application: String def config: Config + def scheduler: ScheduledExecutorService } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 75ad81d5..72573a8a 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,8 +1,8 @@ package kamon -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigFactory} import kamon.metric.instrument.Histogram -import kamon.metric.{Entity, EntityRecorder, Metrics} +import kamon.metric.{Entity, EntityRecorder, RecorderRegistry, RecorderRegistryImpl} import kamon.trace.Tracer /** @@ -13,10 +13,10 @@ import kamon.trace.Tracer * */ trait Kamon { - def metrics: Metrics + def metrics: RecorderRegistry def tracer: Tracer - def subscriptions: Subscriptions + def subscriptions: Reporters def util: Util def environment: Environment @@ -28,7 +28,10 @@ trait Kamon { } object Kamon { - def getHistogram: Histogram = ??? + val metricsModule = new RecorderRegistryImpl(ConfigFactory.load()) + val reports = new ReportersRegistry(metricsModule) + + def metrics: RecorderRegistry = metricsModule } diff --git a/kamon-core/src/main/scala/kamon/Reporters.scala b/kamon-core/src/main/scala/kamon/Reporters.scala new file mode 100644 index 00000000..686ababf --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Reporters.scala @@ -0,0 +1,160 @@ +package kamon + +import java.time.Instant +import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadFactory, TimeUnit} + +import com.typesafe.config.Config +import kamon.metric._ +import org.slf4j.LoggerFactory + +import scala.collection.concurrent.TrieMap +import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.NonFatal + +trait Reporters { + def loadFromConfig(): Unit + def stop(): Unit + + def addReporter(subscriber: MetricsReporter): Cancellable + def addReporter(subscriber: MetricsReporter, name: String): Cancellable + +} + + +class DummyReporter(name: String) extends MetricsReporter { + override def reconfigure(config: Config): Unit = { + println("NAME: " + name + "===> Reconfiguring Dummy") + } + + override def start(config: Config): Unit = { + + println("NAME: " + name + "===> Starting DUMMY") + } + + override def stop(): Unit = { + println("NAME: " + name + "===> Stopping Dummy") + } + + override def processTick(snapshot: TickSnapshot): Unit = { + println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot) + println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}") + snapshot.entities.foreach { e => + println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", ")) + } + } +} + +class ReportersRegistry(metrics: RecorderRegistryImpl) extends Reporters { + private val scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-scheduler")) + private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]() + private val mReporters = TrieMap.empty[String, MetricsReporter] + + + + metricReporters.add(ReporterEntry(new DummyReporter("statsd"), createExecutionContext("statsd-reporter"))) + startMetricsTicker() + + + + + override def loadFromConfig(): Unit = ??? + override def stop(): Unit = ??? + + + override def addReporter(subscriber: MetricsReporter): Cancellable = ??? + + override def addReporter(subscriber: MetricsReporter, name: String): Cancellable = { + ??? + } + + + + private def createExecutionContext(name: String): ExecutionContext = { + val threadFactory = new ThreadFactory { + val defaultFactory = Executors.defaultThreadFactory() + override def newThread(r: Runnable): Thread = { + val thread = defaultFactory.newThread(r) + thread.setName(name) + thread + } + } + + ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(threadFactory)) + } + + /** + * Creates a thread factory that assigns the specified name to all created Threads. + */ + private def threadFactory(name: String): ThreadFactory = + new ThreadFactory { + val defaultFactory = Executors.defaultThreadFactory() + + override def newThread(r: Runnable): Thread = { + val thread = defaultFactory.newThread(r) + thread.setName(name) + thread + } + } + + + def reconfigure(config: Config): Unit = {} + + + + private case class ReporterEntry(reporter: MetricsReporter, executionContext: ExecutionContext) + + + + def startMetricsTicker(): Unit = { + scheduler.scheduleAtFixedRate(new MetricTicker(metrics, metricReporters), 2, 2, TimeUnit.SECONDS) + } + + + private class MetricTicker(metricsImpl: RecorderRegistryImpl, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable { + val logger = LoggerFactory.getLogger(classOf[MetricTicker]) + var lastTick = Instant.now() + + def run(): Unit = try { + val currentTick = Instant.now() + val tickSnapshot = TickSnapshot( + interval = Interval(lastTick, currentTick), + entities = metricsImpl.snapshot() + ) + + reporterEntries.forEach { entry => + Future(entry.reporter.processTick(tickSnapshot))(executor = entry.executionContext) + } + + lastTick = currentTick + + } catch { + case NonFatal(t) => logger.error("Error while running a tick", t) + } + } +} + + + +trait Cancellable { + def cancel(): Unit +} + +trait MetricsReporter { + def reconfigure(config: Config): Unit + + def start(config: Config): Unit + def stop(): Unit + + def processTick(snapshot: TickSnapshot) +} + + + +object TestingAllExample extends App { + val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty)) + while(true) { + recorder.counter("test-other").increment() + Thread.sleep(100) + } + +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/Subscriptions.scala b/kamon-core/src/main/scala/kamon/Subscriptions.scala deleted file mode 100644 index ff5dda4c..00000000 --- a/kamon-core/src/main/scala/kamon/Subscriptions.scala +++ /dev/null @@ -1,12 +0,0 @@ -package kamon - -import kamon.metric.MetricsSubscriber - -trait Subscriptions { - def loadFromConfig() - def subscribeToMetrics(subscriber: MetricsSubscriber): Subscription -} - -trait Subscription { - def cancel(): Unit -} diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala index a94881d2..8ce37082 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -5,6 +5,8 @@ import java.time.Duration import kamon.metric.instrument._ import kamon.util.MeasurementUnit +import scala.collection.concurrent.TrieMap + trait EntityRecorder { def histogram(name: String): Histogram def histogram(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange): Histogram @@ -13,11 +15,55 @@ trait EntityRecorder { def minMaxCounter(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, sampleFrequency: Duration): MinMaxCounter def gauge(name: String): Gauge + def gauge(name: String, measurementUnit: MeasurementUnit): Gauge def counter(name: String): Counter + def counter(name: String, measurementUnit: MeasurementUnit): Counter +} + +trait EntitySnapshotProducer { + def snapshot(): EntitySnapshot } -class EntityRecorderImpl { + +class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory) extends EntityRecorder with EntitySnapshotProducer { + private val histograms = TrieMap.empty[String, Histogram with DistributionSnapshotInstrument] + private val minMaxCounters = TrieMap.empty[String, MinMaxCounter with DistributionSnapshotInstrument] + private val counters = TrieMap.empty[String, Counter with SingleValueSnapshotInstrument] + private val gauges = TrieMap.empty[String, Gauge with SingleValueSnapshotInstrument] + + def histogram(name: String): Histogram = + histograms.atomicGetOrElseUpdate(name, instrumentFactory.buildHistogram(entity, name)) + + def histogram(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange): Histogram = + histograms.atomicGetOrElseUpdate(name, instrumentFactory.buildHistogram(entity, name, dynamicRange, measurementUnit)) + + def minMaxCounter(name: String): MinMaxCounter = + minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name)) + + def minMaxCounter(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, sampleFrequency: Duration): MinMaxCounter = + minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name, dynamicRange, sampleFrequency, measurementUnit)) + + def gauge(name: String): Gauge = + gauges.atomicGetOrElseUpdate(name, instrumentFactory.buildGauge(entity, name)) + + def gauge(name: String, measurementUnit: MeasurementUnit): Gauge = + gauges.atomicGetOrElseUpdate(name, instrumentFactory.buildGauge(entity, name, measurementUnit)) + + def counter(name: String): Counter = + counters.atomicGetOrElseUpdate(name, instrumentFactory.buildCounter(entity, name)) + + def counter(name: String, measurementUnit: MeasurementUnit): Counter = + counters.atomicGetOrElseUpdate(name, instrumentFactory.buildCounter(entity, name, measurementUnit)) + + def snapshot(): EntitySnapshot = + new EntitySnapshot( + entity, + histograms = histograms.values.map(_.snapshot()).toSeq, + minMaxCounters = minMaxCounters.values.map(_.snapshot()).toSeq, + gauges = gauges.values.map(_.snapshot()).toSeq, + counters = counters.values.map(_.snapshot()).toSeq + ) }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala index e51e80cc..a7db93eb 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala @@ -2,10 +2,10 @@ package kamon.metric import kamon.metric.instrument.{DistributionSnapshot, SingleValueSnapshot} -trait EntitySnapshot { - def entity: Entity - def histograms: Seq[DistributionSnapshot] - def minMaxCounters: Seq[DistributionSnapshot] - def gauges: Seq[SingleValueSnapshot] - def counters: Seq[SingleValueSnapshot] -}
\ No newline at end of file +class EntitySnapshot( + val entity: Entity, + val histograms: Seq[DistributionSnapshot], + val minMaxCounters: Seq[DistributionSnapshot], + val gauges: Seq[SingleValueSnapshot], + val counters: Seq[SingleValueSnapshot] +)
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsSubscriber.scala b/kamon-core/src/main/scala/kamon/metric/MetricsSubscriber.scala deleted file mode 100644 index dbdfde9d..00000000 --- a/kamon-core/src/main/scala/kamon/metric/MetricsSubscriber.scala +++ /dev/null @@ -1,12 +0,0 @@ -package kamon.metric - -import com.typesafe.config.Config - -trait MetricsSubscriber { - def reconfigure(config: Config): Unit - - def start(config: Config): Unit - def shutdown(): Unit - - def processTick(snapshot: String) -} diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala index f312c5b7..99974032 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala @@ -1,10 +1,13 @@ package kamon package metric +import com.typesafe.config.Config +import kamon.metric.instrument.InstrumentFactory + import scala.collection.concurrent.TrieMap -trait Metrics { +trait RecorderRegistry { def getRecorder(entity: Entity): EntityRecorder def getRecorder(name: String, category: String, tags: Map[String, String]): EntityRecorder @@ -12,11 +15,12 @@ trait Metrics { def removeRecorder(name: String, category: String, tags: Map[String, String]): Boolean } -class MetricsImpl extends Metrics{ - private val entities = TrieMap.empty[Entity, EntityRecorder] +class RecorderRegistryImpl(config: Config) extends RecorderRegistry { + private val instrumentFactory = InstrumentFactory(config.getConfig("kamon.metric.instrument-factory")) + private val entities = TrieMap.empty[Entity, EntityRecorder with EntitySnapshotProducer] override def getRecorder(entity: Entity): EntityRecorder = { - ??? + entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory)) } override def getRecorder(name: String, category: String, tags: Map[String, String]): EntityRecorder = ??? @@ -24,6 +28,10 @@ class MetricsImpl extends Metrics{ override def removeRecorder(entity: Entity): Boolean = ??? override def removeRecorder(name: String, category: String, tags: Map[String, String]): Boolean = ??? + + private[kamon] def snapshot(): Seq[EntitySnapshot] = { + entities.values.map(_.snapshot()).toSeq + } } @@ -32,3 +40,5 @@ class MetricsImpl extends Metrics{ + + diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala index 4248180c..f4578965 100644 --- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala @@ -2,16 +2,8 @@ package kamon.metric import java.time.Instant - -trait TickSnapshot { - def interval: Interval - def entities: Seq[EntitySnapshot] -} - -trait Interval { - def from: Instant - def to: Instant -} +case class TickSnapshot(interval: Interval, entities: Seq[EntitySnapshot]) +case class Interval(from: Instant, to: Instant) diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala b/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala index ebb82040..dc3cad08 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala @@ -6,8 +6,7 @@ import java.util.concurrent.atomic.AtomicLongArray import kamon.metric.instrument.DynamicRange /** - * This class exposes package-private members of the [[AtomicHistogram]] class that are required to properly generate - * snapshots of our HdrHistogram implementation. + * Exposes package-private members of [[org.HdrHistogram.AtomicHistogram]]. */ abstract class AtomicHistogramExtension(dr: DynamicRange) extends AtomicHistogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) { @@ -22,7 +21,7 @@ abstract class AtomicHistogramExtension(dr: DynamicRange) } /** - * Exposes the package-private members of [[ZigZagEncoding]]. + * Exposes the package-private members of [[org.HdrHistogram.ZigZagEncoding]]. */ object ZigZag { def putLong(buffer: ByteBuffer, value: Long): Unit = diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala index fb6dfe27..4f0502f0 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala @@ -27,7 +27,7 @@ private[metric] class InstrumentFactory private ( } def buildMinMaxCounter(entity: Entity, name: String, dynamicRange: DynamicRange = defaultMMCounterDynamicRange, - sampleInterval: Duration = defaultMMCounterSampleRate, measurementUnit: MeasurementUnit = MeasurementUnit.none): MinMaxCounter = { + sampleInterval: Duration = defaultMMCounterSampleRate, measurementUnit: MeasurementUnit = MeasurementUnit.none): MinMaxCounter with DistributionSnapshotInstrument = { val underlyingHistogram = buildHistogram(entity, name, dynamicRange, measurementUnit) new PaddedMinMaxCounter( diff --git a/kamon-core/src/main/scala/kamon/util/Scheduler.scala b/kamon-core/src/main/scala/kamon/util/Scheduler.scala new file mode 100644 index 00000000..0bc86f9a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/Scheduler.scala @@ -0,0 +1,5 @@ +package kamon.util + +trait Scheduler { + +} |