diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/Reporters.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/Reporters.scala | 160 |
1 files changed, 0 insertions, 160 deletions
diff --git a/kamon-core/src/main/scala/kamon/Reporters.scala b/kamon-core/src/main/scala/kamon/Reporters.scala deleted file mode 100644 index 686ababf..00000000 --- a/kamon-core/src/main/scala/kamon/Reporters.scala +++ /dev/null @@ -1,160 +0,0 @@ -package kamon - -import java.time.Instant -import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadFactory, TimeUnit} - -import com.typesafe.config.Config -import kamon.metric._ -import org.slf4j.LoggerFactory - -import scala.collection.concurrent.TrieMap -import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal - -trait Reporters { - def loadFromConfig(): Unit - def stop(): Unit - - def addReporter(subscriber: MetricsReporter): Cancellable - def addReporter(subscriber: MetricsReporter, name: String): Cancellable - -} - - -class DummyReporter(name: String) extends MetricsReporter { - override def reconfigure(config: Config): Unit = { - println("NAME: " + name + "===> Reconfiguring Dummy") - } - - override def start(config: Config): Unit = { - - println("NAME: " + name + "===> Starting DUMMY") - } - - override def stop(): Unit = { - println("NAME: " + name + "===> Stopping Dummy") - } - - override def processTick(snapshot: TickSnapshot): Unit = { - println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot) - println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}") - snapshot.entities.foreach { e => - println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", ")) - } - } -} - -class ReportersRegistry(metrics: RecorderRegistryImpl) extends Reporters { - private val scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-scheduler")) - private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]() - private val mReporters = TrieMap.empty[String, MetricsReporter] - - - - metricReporters.add(ReporterEntry(new DummyReporter("statsd"), createExecutionContext("statsd-reporter"))) - startMetricsTicker() - - - - - override def loadFromConfig(): Unit = ??? - override def stop(): Unit = ??? - - - override def addReporter(subscriber: MetricsReporter): Cancellable = ??? - - override def addReporter(subscriber: MetricsReporter, name: String): Cancellable = { - ??? - } - - - - private def createExecutionContext(name: String): ExecutionContext = { - val threadFactory = new ThreadFactory { - val defaultFactory = Executors.defaultThreadFactory() - override def newThread(r: Runnable): Thread = { - val thread = defaultFactory.newThread(r) - thread.setName(name) - thread - } - } - - ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(threadFactory)) - } - - /** - * Creates a thread factory that assigns the specified name to all created Threads. - */ - private def threadFactory(name: String): ThreadFactory = - new ThreadFactory { - val defaultFactory = Executors.defaultThreadFactory() - - override def newThread(r: Runnable): Thread = { - val thread = defaultFactory.newThread(r) - thread.setName(name) - thread - } - } - - - def reconfigure(config: Config): Unit = {} - - - - private case class ReporterEntry(reporter: MetricsReporter, executionContext: ExecutionContext) - - - - def startMetricsTicker(): Unit = { - scheduler.scheduleAtFixedRate(new MetricTicker(metrics, metricReporters), 2, 2, TimeUnit.SECONDS) - } - - - private class MetricTicker(metricsImpl: RecorderRegistryImpl, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable { - val logger = LoggerFactory.getLogger(classOf[MetricTicker]) - var lastTick = Instant.now() - - def run(): Unit = try { - val currentTick = Instant.now() - val tickSnapshot = TickSnapshot( - interval = Interval(lastTick, currentTick), - entities = metricsImpl.snapshot() - ) - - reporterEntries.forEach { entry => - Future(entry.reporter.processTick(tickSnapshot))(executor = entry.executionContext) - } - - lastTick = currentTick - - } catch { - case NonFatal(t) => logger.error("Error while running a tick", t) - } - } -} - - - -trait Cancellable { - def cancel(): Unit -} - -trait MetricsReporter { - def reconfigure(config: Config): Unit - - def start(config: Config): Unit - def stop(): Unit - - def processTick(snapshot: TickSnapshot) -} - - - -object TestingAllExample extends App { - val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty)) - while(true) { - recorder.counter("test-other").increment() - Thread.sleep(100) - } - -}
\ No newline at end of file |