aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/Reporters.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/Reporters.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/Reporters.scala160
1 files changed, 160 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/Reporters.scala b/kamon-core/src/main/scala/kamon/Reporters.scala
new file mode 100644
index 00000000..686ababf
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Reporters.scala
@@ -0,0 +1,160 @@
+package kamon
+
+import java.time.Instant
+import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadFactory, TimeUnit}
+
+import com.typesafe.config.Config
+import kamon.metric._
+import org.slf4j.LoggerFactory
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
+
+trait Reporters {
+ def loadFromConfig(): Unit
+ def stop(): Unit
+
+ def addReporter(subscriber: MetricsReporter): Cancellable
+ def addReporter(subscriber: MetricsReporter, name: String): Cancellable
+
+}
+
+
+class DummyReporter(name: String) extends MetricsReporter {
+ override def reconfigure(config: Config): Unit = {
+ println("NAME: " + name + "===> Reconfiguring Dummy")
+ }
+
+ override def start(config: Config): Unit = {
+
+ println("NAME: " + name + "===> Starting DUMMY")
+ }
+
+ override def stop(): Unit = {
+ println("NAME: " + name + "===> Stopping Dummy")
+ }
+
+ override def processTick(snapshot: TickSnapshot): Unit = {
+ println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot)
+ println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}")
+ snapshot.entities.foreach { e =>
+ println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", "))
+ }
+ }
+}
+
+class ReportersRegistry(metrics: RecorderRegistryImpl) extends Reporters {
+ private val scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-scheduler"))
+ private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]()
+ private val mReporters = TrieMap.empty[String, MetricsReporter]
+
+
+
+ metricReporters.add(ReporterEntry(new DummyReporter("statsd"), createExecutionContext("statsd-reporter")))
+ startMetricsTicker()
+
+
+
+
+ override def loadFromConfig(): Unit = ???
+ override def stop(): Unit = ???
+
+
+ override def addReporter(subscriber: MetricsReporter): Cancellable = ???
+
+ override def addReporter(subscriber: MetricsReporter, name: String): Cancellable = {
+ ???
+ }
+
+
+
+ private def createExecutionContext(name: String): ExecutionContext = {
+ val threadFactory = new ThreadFactory {
+ val defaultFactory = Executors.defaultThreadFactory()
+ override def newThread(r: Runnable): Thread = {
+ val thread = defaultFactory.newThread(r)
+ thread.setName(name)
+ thread
+ }
+ }
+
+ ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(threadFactory))
+ }
+
+ /**
+ * Creates a thread factory that assigns the specified name to all created Threads.
+ */
+ private def threadFactory(name: String): ThreadFactory =
+ new ThreadFactory {
+ val defaultFactory = Executors.defaultThreadFactory()
+
+ override def newThread(r: Runnable): Thread = {
+ val thread = defaultFactory.newThread(r)
+ thread.setName(name)
+ thread
+ }
+ }
+
+
+ def reconfigure(config: Config): Unit = {}
+
+
+
+ private case class ReporterEntry(reporter: MetricsReporter, executionContext: ExecutionContext)
+
+
+
+ def startMetricsTicker(): Unit = {
+ scheduler.scheduleAtFixedRate(new MetricTicker(metrics, metricReporters), 2, 2, TimeUnit.SECONDS)
+ }
+
+
+ private class MetricTicker(metricsImpl: RecorderRegistryImpl, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
+ val logger = LoggerFactory.getLogger(classOf[MetricTicker])
+ var lastTick = Instant.now()
+
+ def run(): Unit = try {
+ val currentTick = Instant.now()
+ val tickSnapshot = TickSnapshot(
+ interval = Interval(lastTick, currentTick),
+ entities = metricsImpl.snapshot()
+ )
+
+ reporterEntries.forEach { entry =>
+ Future(entry.reporter.processTick(tickSnapshot))(executor = entry.executionContext)
+ }
+
+ lastTick = currentTick
+
+ } catch {
+ case NonFatal(t) => logger.error("Error while running a tick", t)
+ }
+ }
+}
+
+
+
+trait Cancellable {
+ def cancel(): Unit
+}
+
+trait MetricsReporter {
+ def reconfigure(config: Config): Unit
+
+ def start(config: Config): Unit
+ def stop(): Unit
+
+ def processTick(snapshot: TickSnapshot)
+}
+
+
+
+object TestingAllExample extends App {
+ val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty))
+ while(true) {
+ recorder.counter("test-other").increment()
+ Thread.sleep(100)
+ }
+
+} \ No newline at end of file