aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-05-02 13:18:26 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-05-02 13:18:26 +0200
commit4247aa319ac6e17b7ef7a76d61bac32c872575e3 (patch)
tree9f16b2c31c272cee658ab9f0b9906e3f4633951e /kamon-core/src/main
parentf24c1a7a4b96dcfb2609c6f512f34dd6d54de439 (diff)
downloadKamon-4247aa319ac6e17b7ef7a76d61bac32c872575e3.tar.gz
Kamon-4247aa319ac6e17b7ef7a76d61bac32c872575e3.tar.bz2
Kamon-4247aa319ac6e17b7ef7a76d61bac32c872575e3.zip
wip: playing with akka-less implementation of subscriptions
Diffstat (limited to 'kamon-core/src/main')
-rw-r--r--kamon-core/src/main/scala/kamon/Environment.scala3
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/Reporters.scala160
-rw-r--r--kamon-core/src/main/scala/kamon/Subscriptions.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala48
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala14
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsSubscriber.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala (renamed from kamon-core/src/main/scala/kamon/metric/Metrics.scala)18
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala5
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/util/Scheduler.scala5
12 files changed, 249 insertions, 55 deletions
diff --git a/kamon-core/src/main/scala/kamon/Environment.scala b/kamon-core/src/main/scala/kamon/Environment.scala
index 3184184a..5f728cbd 100644
--- a/kamon-core/src/main/scala/kamon/Environment.scala
+++ b/kamon-core/src/main/scala/kamon/Environment.scala
@@ -1,5 +1,7 @@
package kamon
+import java.util.concurrent.ScheduledExecutorService
+
import com.typesafe.config.Config
trait Environment {
@@ -7,4 +9,5 @@ trait Environment {
def host: String
def application: String
def config: Config
+ def scheduler: ScheduledExecutorService
}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 75ad81d5..72573a8a 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -1,8 +1,8 @@
package kamon
-import com.typesafe.config.Config
+import com.typesafe.config.{Config, ConfigFactory}
import kamon.metric.instrument.Histogram
-import kamon.metric.{Entity, EntityRecorder, Metrics}
+import kamon.metric.{Entity, EntityRecorder, RecorderRegistry, RecorderRegistryImpl}
import kamon.trace.Tracer
/**
@@ -13,10 +13,10 @@ import kamon.trace.Tracer
*
*/
trait Kamon {
- def metrics: Metrics
+ def metrics: RecorderRegistry
def tracer: Tracer
- def subscriptions: Subscriptions
+ def subscriptions: Reporters
def util: Util
def environment: Environment
@@ -28,7 +28,10 @@ trait Kamon {
}
object Kamon {
- def getHistogram: Histogram = ???
+ val metricsModule = new RecorderRegistryImpl(ConfigFactory.load())
+ val reports = new ReportersRegistry(metricsModule)
+
+ def metrics: RecorderRegistry = metricsModule
}
diff --git a/kamon-core/src/main/scala/kamon/Reporters.scala b/kamon-core/src/main/scala/kamon/Reporters.scala
new file mode 100644
index 00000000..686ababf
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Reporters.scala
@@ -0,0 +1,160 @@
+package kamon
+
+import java.time.Instant
+import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadFactory, TimeUnit}
+
+import com.typesafe.config.Config
+import kamon.metric._
+import org.slf4j.LoggerFactory
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
+
+trait Reporters {
+ def loadFromConfig(): Unit
+ def stop(): Unit
+
+ def addReporter(subscriber: MetricsReporter): Cancellable
+ def addReporter(subscriber: MetricsReporter, name: String): Cancellable
+
+}
+
+
+class DummyReporter(name: String) extends MetricsReporter {
+ override def reconfigure(config: Config): Unit = {
+ println("NAME: " + name + "===> Reconfiguring Dummy")
+ }
+
+ override def start(config: Config): Unit = {
+
+ println("NAME: " + name + "===> Starting DUMMY")
+ }
+
+ override def stop(): Unit = {
+ println("NAME: " + name + "===> Stopping Dummy")
+ }
+
+ override def processTick(snapshot: TickSnapshot): Unit = {
+ println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot)
+ println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}")
+ snapshot.entities.foreach { e =>
+ println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", "))
+ }
+ }
+}
+
+class ReportersRegistry(metrics: RecorderRegistryImpl) extends Reporters {
+ private val scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-scheduler"))
+ private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]()
+ private val mReporters = TrieMap.empty[String, MetricsReporter]
+
+
+
+ metricReporters.add(ReporterEntry(new DummyReporter("statsd"), createExecutionContext("statsd-reporter")))
+ startMetricsTicker()
+
+
+
+
+ override def loadFromConfig(): Unit = ???
+ override def stop(): Unit = ???
+
+
+ override def addReporter(subscriber: MetricsReporter): Cancellable = ???
+
+ override def addReporter(subscriber: MetricsReporter, name: String): Cancellable = {
+ ???
+ }
+
+
+
+ private def createExecutionContext(name: String): ExecutionContext = {
+ val threadFactory = new ThreadFactory {
+ val defaultFactory = Executors.defaultThreadFactory()
+ override def newThread(r: Runnable): Thread = {
+ val thread = defaultFactory.newThread(r)
+ thread.setName(name)
+ thread
+ }
+ }
+
+ ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(threadFactory))
+ }
+
+ /**
+ * Creates a thread factory that assigns the specified name to all created Threads.
+ */
+ private def threadFactory(name: String): ThreadFactory =
+ new ThreadFactory {
+ val defaultFactory = Executors.defaultThreadFactory()
+
+ override def newThread(r: Runnable): Thread = {
+ val thread = defaultFactory.newThread(r)
+ thread.setName(name)
+ thread
+ }
+ }
+
+
+ def reconfigure(config: Config): Unit = {}
+
+
+
+ private case class ReporterEntry(reporter: MetricsReporter, executionContext: ExecutionContext)
+
+
+
+ def startMetricsTicker(): Unit = {
+ scheduler.scheduleAtFixedRate(new MetricTicker(metrics, metricReporters), 2, 2, TimeUnit.SECONDS)
+ }
+
+
+ private class MetricTicker(metricsImpl: RecorderRegistryImpl, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
+ val logger = LoggerFactory.getLogger(classOf[MetricTicker])
+ var lastTick = Instant.now()
+
+ def run(): Unit = try {
+ val currentTick = Instant.now()
+ val tickSnapshot = TickSnapshot(
+ interval = Interval(lastTick, currentTick),
+ entities = metricsImpl.snapshot()
+ )
+
+ reporterEntries.forEach { entry =>
+ Future(entry.reporter.processTick(tickSnapshot))(executor = entry.executionContext)
+ }
+
+ lastTick = currentTick
+
+ } catch {
+ case NonFatal(t) => logger.error("Error while running a tick", t)
+ }
+ }
+}
+
+
+
+trait Cancellable {
+ def cancel(): Unit
+}
+
+trait MetricsReporter {
+ def reconfigure(config: Config): Unit
+
+ def start(config: Config): Unit
+ def stop(): Unit
+
+ def processTick(snapshot: TickSnapshot)
+}
+
+
+
+object TestingAllExample extends App {
+ val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty))
+ while(true) {
+ recorder.counter("test-other").increment()
+ Thread.sleep(100)
+ }
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/Subscriptions.scala b/kamon-core/src/main/scala/kamon/Subscriptions.scala
deleted file mode 100644
index ff5dda4c..00000000
--- a/kamon-core/src/main/scala/kamon/Subscriptions.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package kamon
-
-import kamon.metric.MetricsSubscriber
-
-trait Subscriptions {
- def loadFromConfig()
- def subscribeToMetrics(subscriber: MetricsSubscriber): Subscription
-}
-
-trait Subscription {
- def cancel(): Unit
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
index a94881d2..8ce37082 100644
--- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
@@ -5,6 +5,8 @@ import java.time.Duration
import kamon.metric.instrument._
import kamon.util.MeasurementUnit
+import scala.collection.concurrent.TrieMap
+
trait EntityRecorder {
def histogram(name: String): Histogram
def histogram(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange): Histogram
@@ -13,11 +15,55 @@ trait EntityRecorder {
def minMaxCounter(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, sampleFrequency: Duration): MinMaxCounter
def gauge(name: String): Gauge
+ def gauge(name: String, measurementUnit: MeasurementUnit): Gauge
def counter(name: String): Counter
+ def counter(name: String, measurementUnit: MeasurementUnit): Counter
+}
+
+trait EntitySnapshotProducer {
+ def snapshot(): EntitySnapshot
}
-class EntityRecorderImpl {
+
+class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory) extends EntityRecorder with EntitySnapshotProducer {
+ private val histograms = TrieMap.empty[String, Histogram with DistributionSnapshotInstrument]
+ private val minMaxCounters = TrieMap.empty[String, MinMaxCounter with DistributionSnapshotInstrument]
+ private val counters = TrieMap.empty[String, Counter with SingleValueSnapshotInstrument]
+ private val gauges = TrieMap.empty[String, Gauge with SingleValueSnapshotInstrument]
+
+ def histogram(name: String): Histogram =
+ histograms.atomicGetOrElseUpdate(name, instrumentFactory.buildHistogram(entity, name))
+
+ def histogram(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange): Histogram =
+ histograms.atomicGetOrElseUpdate(name, instrumentFactory.buildHistogram(entity, name, dynamicRange, measurementUnit))
+
+ def minMaxCounter(name: String): MinMaxCounter =
+ minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name))
+
+ def minMaxCounter(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, sampleFrequency: Duration): MinMaxCounter =
+ minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name, dynamicRange, sampleFrequency, measurementUnit))
+
+ def gauge(name: String): Gauge =
+ gauges.atomicGetOrElseUpdate(name, instrumentFactory.buildGauge(entity, name))
+
+ def gauge(name: String, measurementUnit: MeasurementUnit): Gauge =
+ gauges.atomicGetOrElseUpdate(name, instrumentFactory.buildGauge(entity, name, measurementUnit))
+
+ def counter(name: String): Counter =
+ counters.atomicGetOrElseUpdate(name, instrumentFactory.buildCounter(entity, name))
+
+ def counter(name: String, measurementUnit: MeasurementUnit): Counter =
+ counters.atomicGetOrElseUpdate(name, instrumentFactory.buildCounter(entity, name, measurementUnit))
+
+ def snapshot(): EntitySnapshot =
+ new EntitySnapshot(
+ entity,
+ histograms = histograms.values.map(_.snapshot()).toSeq,
+ minMaxCounters = minMaxCounters.values.map(_.snapshot()).toSeq,
+ gauges = gauges.values.map(_.snapshot()).toSeq,
+ counters = counters.values.map(_.snapshot()).toSeq
+ )
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala
index e51e80cc..a7db93eb 100644
--- a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala
+++ b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala
@@ -2,10 +2,10 @@ package kamon.metric
import kamon.metric.instrument.{DistributionSnapshot, SingleValueSnapshot}
-trait EntitySnapshot {
- def entity: Entity
- def histograms: Seq[DistributionSnapshot]
- def minMaxCounters: Seq[DistributionSnapshot]
- def gauges: Seq[SingleValueSnapshot]
- def counters: Seq[SingleValueSnapshot]
-} \ No newline at end of file
+class EntitySnapshot(
+ val entity: Entity,
+ val histograms: Seq[DistributionSnapshot],
+ val minMaxCounters: Seq[DistributionSnapshot],
+ val gauges: Seq[SingleValueSnapshot],
+ val counters: Seq[SingleValueSnapshot]
+) \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsSubscriber.scala b/kamon-core/src/main/scala/kamon/metric/MetricsSubscriber.scala
deleted file mode 100644
index dbdfde9d..00000000
--- a/kamon-core/src/main/scala/kamon/metric/MetricsSubscriber.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package kamon.metric
-
-import com.typesafe.config.Config
-
-trait MetricsSubscriber {
- def reconfigure(config: Config): Unit
-
- def start(config: Config): Unit
- def shutdown(): Unit
-
- def processTick(snapshot: String)
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala
index f312c5b7..99974032 100644
--- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala
@@ -1,10 +1,13 @@
package kamon
package metric
+import com.typesafe.config.Config
+import kamon.metric.instrument.InstrumentFactory
+
import scala.collection.concurrent.TrieMap
-trait Metrics {
+trait RecorderRegistry {
def getRecorder(entity: Entity): EntityRecorder
def getRecorder(name: String, category: String, tags: Map[String, String]): EntityRecorder
@@ -12,11 +15,12 @@ trait Metrics {
def removeRecorder(name: String, category: String, tags: Map[String, String]): Boolean
}
-class MetricsImpl extends Metrics{
- private val entities = TrieMap.empty[Entity, EntityRecorder]
+class RecorderRegistryImpl(config: Config) extends RecorderRegistry {
+ private val instrumentFactory = InstrumentFactory(config.getConfig("kamon.metric.instrument-factory"))
+ private val entities = TrieMap.empty[Entity, EntityRecorder with EntitySnapshotProducer]
override def getRecorder(entity: Entity): EntityRecorder = {
- ???
+ entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory))
}
override def getRecorder(name: String, category: String, tags: Map[String, String]): EntityRecorder = ???
@@ -24,6 +28,10 @@ class MetricsImpl extends Metrics{
override def removeRecorder(entity: Entity): Boolean = ???
override def removeRecorder(name: String, category: String, tags: Map[String, String]): Boolean = ???
+
+ private[kamon] def snapshot(): Seq[EntitySnapshot] = {
+ entities.values.map(_.snapshot()).toSeq
+ }
}
@@ -32,3 +40,5 @@ class MetricsImpl extends Metrics{
+
+
diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
index 4248180c..f4578965 100644
--- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala
@@ -2,16 +2,8 @@ package kamon.metric
import java.time.Instant
-
-trait TickSnapshot {
- def interval: Interval
- def entities: Seq[EntitySnapshot]
-}
-
-trait Interval {
- def from: Instant
- def to: Instant
-}
+case class TickSnapshot(interval: Interval, entities: Seq[EntitySnapshot])
+case class Interval(from: Instant, to: Instant)
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala b/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala
index ebb82040..dc3cad08 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala
@@ -6,8 +6,7 @@ import java.util.concurrent.atomic.AtomicLongArray
import kamon.metric.instrument.DynamicRange
/**
- * This class exposes package-private members of the [[AtomicHistogram]] class that are required to properly generate
- * snapshots of our HdrHistogram implementation.
+ * Exposes package-private members of [[org.HdrHistogram.AtomicHistogram]].
*/
abstract class AtomicHistogramExtension(dr: DynamicRange)
extends AtomicHistogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) {
@@ -22,7 +21,7 @@ abstract class AtomicHistogramExtension(dr: DynamicRange)
}
/**
- * Exposes the package-private members of [[ZigZagEncoding]].
+ * Exposes the package-private members of [[org.HdrHistogram.ZigZagEncoding]].
*/
object ZigZag {
def putLong(buffer: ByteBuffer, value: Long): Unit =
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala
index fb6dfe27..4f0502f0 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala
@@ -27,7 +27,7 @@ private[metric] class InstrumentFactory private (
}
def buildMinMaxCounter(entity: Entity, name: String, dynamicRange: DynamicRange = defaultMMCounterDynamicRange,
- sampleInterval: Duration = defaultMMCounterSampleRate, measurementUnit: MeasurementUnit = MeasurementUnit.none): MinMaxCounter = {
+ sampleInterval: Duration = defaultMMCounterSampleRate, measurementUnit: MeasurementUnit = MeasurementUnit.none): MinMaxCounter with DistributionSnapshotInstrument = {
val underlyingHistogram = buildHistogram(entity, name, dynamicRange, measurementUnit)
new PaddedMinMaxCounter(
diff --git a/kamon-core/src/main/scala/kamon/util/Scheduler.scala b/kamon-core/src/main/scala/kamon/util/Scheduler.scala
new file mode 100644
index 00000000..0bc86f9a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/Scheduler.scala
@@ -0,0 +1,5 @@
+package kamon.util
+
+trait Scheduler {
+
+}