From 5dee54a0794b282e9b5729a3d4b85478c12a68d1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 3 May 2017 12:11:47 +0200 Subject: handle reporters shutdown and reconfigures --- kamon-core/src/main/resources/reference.conf | 2 +- kamon-core/src/main/scala/kamon/Kamon.scala | 27 ++- .../src/main/scala/kamon/ReporterRegistry.scala | 209 +++++++++++++++++++++ kamon-core/src/main/scala/kamon/Reporters.scala | 160 ---------------- .../main/scala/kamon/metric/RecorderRegistry.scala | 14 +- 5 files changed, 234 insertions(+), 178 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/ReporterRegistry.scala delete mode 100644 kamon-core/src/main/scala/kamon/Reporters.scala (limited to 'kamon-core/src/main') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 9de2247b..61072507 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -1,6 +1,6 @@ kamon { metric { - + tick-interval = 1 second instrument-factory { diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 72573a8a..316a9a24 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -12,26 +12,25 @@ import kamon.trace.Tracer * * */ -trait Kamon { - def metrics: RecorderRegistry - def tracer: Tracer - - def subscriptions: Reporters - def util: Util +object Kamon { + private val recorderRegistry = new RecorderRegistryImpl(ConfigFactory.load()) + private val reporterRegistry = new ReporterRegistryImpl(recorderRegistry, ConfigFactory.load()) - def environment: Environment - def diagnose: Diagnostic + def metrics: RecorderRegistry = recorderRegistry + def reporters: ReporterRegistry = reporterRegistry - def reconfigure(config: Config): Unit + def reconfigure(config: Config): Unit = synchronized { + recorderRegistry.reconfigure(config) + reporterRegistry.reconfigure(config) + } -} + def tracer: Tracer = ??? + def environment: Environment = ??? + def diagnose: Diagnostic = ??? + def util: Util = ??? -object Kamon { - val metricsModule = new RecorderRegistryImpl(ConfigFactory.load()) - val reports = new ReportersRegistry(metricsModule) - def metrics: RecorderRegistry = metricsModule } diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala new file mode 100644 index 00000000..09c980f6 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -0,0 +1,209 @@ +package kamon + +import java.time.Instant +import java.util.concurrent.atomic.{AtomicLong, AtomicReference} +import java.util.concurrent._ + +import com.typesafe.config.Config +import kamon.metric._ +import org.slf4j.LoggerFactory + +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} +import scala.util.Try +import scala.util.control.NonFatal + +trait ReporterRegistry { + def loadFromConfig(): Unit + def add(reporter: MetricsReporter): Registration + def add(reporter: MetricsReporter, name: String): Registration + def stopAll(): Future[Unit] +} + +class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) extends ReporterRegistry { + private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry")) + private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]() + private val reporterCounter = new AtomicLong(0L) + + reconfigure(initialConfig) + + override def loadFromConfig(): Unit = ??? + + override def add(reporter: MetricsReporter): Registration = + add(reporter, reporter.getClass.getName()) + + override def add(reporter: MetricsReporter, name: String): Registration = { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + val reporterEntry = ReporterEntry( + id = reporterCounter.getAndIncrement(), + reporter = reporter, + executionContext = ExecutionContext.fromExecutorService(executor) + ) + + metricReporters.add(reporterEntry) + + new Registration { + val reporterID = reporterEntry.id + override def cancel(): Boolean = { + metricReporters.removeIf(entry => { + if(entry.id == reporterID) { + stopReporter(entry) + true + } else false + }) + } + } + } + + override def stopAll(): Future[Unit] = { + implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) + val reporterStopFutures = Vector.newBuilder[Future[Unit]] + while(!metricReporters.isEmpty) { + val entry = metricReporters.poll() + if(entry != null) { + reporterStopFutures += stopReporter(entry) + } + } + + Future.sequence(reporterStopFutures.result()).transform(_ => Try((): Unit)) + } + + private[kamon] def reconfigure(config: Config): Unit = synchronized { + val tickInterval = config.getDuration("kamon.metric.tick-interval") + val currentTicker = metricsTickerSchedule.get() + if(currentTicker != null) { + currentTicker.cancel(true) + } + + // Reconfigure all registered reporters + metricReporters.forEach(entry => + Future(entry.reporter.reconfigure(config))(entry.executionContext) + ) + + metricsTickerSchedule.set { + registryExecutionContext.scheduleAtFixedRate( + new MetricTicker(metrics, metricReporters), tickInterval.toMillis, tickInterval.toMillis, TimeUnit.MILLISECONDS + ) + } + } + + private def stopReporter(entry: ReporterEntry): Future[Unit] = { + entry.isActive = false + + Future(entry.reporter.stop())(entry.executionContext).andThen { + case _ => entry.executionContext.shutdown() + }(ExecutionContext.fromExecutor(registryExecutionContext)) + } + + /** + * Creates a thread factory that assigns the specified name to all created Threads. + */ + private def threadFactory(name: String): ThreadFactory = + new ThreadFactory { + val defaultFactory = Executors.defaultThreadFactory() + + override def newThread(r: Runnable): Thread = { + val thread = defaultFactory.newThread(r) + thread.setName(name) + thread + } + } + + + private case class ReporterEntry( + @volatile var isActive: Boolean = true, + id: Long, + reporter: MetricsReporter, + executionContext: ExecutionContextExecutorService + ) + + private class MetricTicker(metricsImpl: RecorderRegistryImpl, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable { + val logger = LoggerFactory.getLogger(classOf[MetricTicker]) + var lastTick = Instant.now() + + def run(): Unit = try { + val currentTick = Instant.now() + val tickSnapshot = TickSnapshot( + interval = Interval(lastTick, currentTick), + entities = metricsImpl.snapshot() + ) + + reporterEntries.forEach { entry => + Future { + if(entry.isActive) + entry.reporter.processTick(tickSnapshot) + + }(executor = entry.executionContext) + } + + lastTick = currentTick + + } catch { + case NonFatal(t) => logger.error("Error while running a tick", t) + } + } +} + + + +trait Registration { + def cancel(): Boolean +} + +trait MetricsReporter { + def reconfigure(config: Config): Unit + + def start(config: Config): Unit + def stop(): Unit + + def processTick(snapshot: TickSnapshot) +} + + + +object TestingAllExample extends App { + val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty)) + + val registration = Kamon.reporters.add(new DummyReporter("test")) + + var x = 0 + while(true) { + recorder.counter("test-other").increment() + Thread.sleep(100) + x += 1 + + if(x == 50) { + registration.cancel() + } + + if(x == 100) { + println("Stopping all reporters") + Kamon.reporters.stopAll() + } + } + +} + + +class DummyReporter(name: String) extends MetricsReporter { + override def reconfigure(config: Config): Unit = { + println("NAME: " + name + "===> Reconfiguring Dummy") + } + + override def start(config: Config): Unit = { + + println("NAME: " + name + "===> Starting DUMMY") + } + + override def stop(): Unit = { + println("NAME: " + name + "===> Stopping Dummy") + } + + override def processTick(snapshot: TickSnapshot): Unit = { + println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot) + println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}") + snapshot.entities.foreach { e => + println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", ")) + } + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/Reporters.scala b/kamon-core/src/main/scala/kamon/Reporters.scala deleted file mode 100644 index 686ababf..00000000 --- a/kamon-core/src/main/scala/kamon/Reporters.scala +++ /dev/null @@ -1,160 +0,0 @@ -package kamon - -import java.time.Instant -import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadFactory, TimeUnit} - -import com.typesafe.config.Config -import kamon.metric._ -import org.slf4j.LoggerFactory - -import scala.collection.concurrent.TrieMap -import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal - -trait Reporters { - def loadFromConfig(): Unit - def stop(): Unit - - def addReporter(subscriber: MetricsReporter): Cancellable - def addReporter(subscriber: MetricsReporter, name: String): Cancellable - -} - - -class DummyReporter(name: String) extends MetricsReporter { - override def reconfigure(config: Config): Unit = { - println("NAME: " + name + "===> Reconfiguring Dummy") - } - - override def start(config: Config): Unit = { - - println("NAME: " + name + "===> Starting DUMMY") - } - - override def stop(): Unit = { - println("NAME: " + name + "===> Stopping Dummy") - } - - override def processTick(snapshot: TickSnapshot): Unit = { - println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot) - println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}") - snapshot.entities.foreach { e => - println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", ")) - } - } -} - -class ReportersRegistry(metrics: RecorderRegistryImpl) extends Reporters { - private val scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-scheduler")) - private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]() - private val mReporters = TrieMap.empty[String, MetricsReporter] - - - - metricReporters.add(ReporterEntry(new DummyReporter("statsd"), createExecutionContext("statsd-reporter"))) - startMetricsTicker() - - - - - override def loadFromConfig(): Unit = ??? - override def stop(): Unit = ??? - - - override def addReporter(subscriber: MetricsReporter): Cancellable = ??? - - override def addReporter(subscriber: MetricsReporter, name: String): Cancellable = { - ??? - } - - - - private def createExecutionContext(name: String): ExecutionContext = { - val threadFactory = new ThreadFactory { - val defaultFactory = Executors.defaultThreadFactory() - override def newThread(r: Runnable): Thread = { - val thread = defaultFactory.newThread(r) - thread.setName(name) - thread - } - } - - ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(threadFactory)) - } - - /** - * Creates a thread factory that assigns the specified name to all created Threads. - */ - private def threadFactory(name: String): ThreadFactory = - new ThreadFactory { - val defaultFactory = Executors.defaultThreadFactory() - - override def newThread(r: Runnable): Thread = { - val thread = defaultFactory.newThread(r) - thread.setName(name) - thread - } - } - - - def reconfigure(config: Config): Unit = {} - - - - private case class ReporterEntry(reporter: MetricsReporter, executionContext: ExecutionContext) - - - - def startMetricsTicker(): Unit = { - scheduler.scheduleAtFixedRate(new MetricTicker(metrics, metricReporters), 2, 2, TimeUnit.SECONDS) - } - - - private class MetricTicker(metricsImpl: RecorderRegistryImpl, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable { - val logger = LoggerFactory.getLogger(classOf[MetricTicker]) - var lastTick = Instant.now() - - def run(): Unit = try { - val currentTick = Instant.now() - val tickSnapshot = TickSnapshot( - interval = Interval(lastTick, currentTick), - entities = metricsImpl.snapshot() - ) - - reporterEntries.forEach { entry => - Future(entry.reporter.processTick(tickSnapshot))(executor = entry.executionContext) - } - - lastTick = currentTick - - } catch { - case NonFatal(t) => logger.error("Error while running a tick", t) - } - } -} - - - -trait Cancellable { - def cancel(): Unit -} - -trait MetricsReporter { - def reconfigure(config: Config): Unit - - def start(config: Config): Unit - def stop(): Unit - - def processTick(snapshot: TickSnapshot) -} - - - -object TestingAllExample extends App { - val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty)) - while(true) { - recorder.counter("test-other").increment() - Thread.sleep(100) - } - -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala index 99974032..8b84ab6a 100644 --- a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala @@ -1,6 +1,8 @@ package kamon package metric +import java.util.concurrent.atomic.AtomicReference + import com.typesafe.config.Config import kamon.metric.instrument.InstrumentFactory @@ -15,12 +17,14 @@ trait RecorderRegistry { def removeRecorder(name: String, category: String, tags: Map[String, String]): Boolean } -class RecorderRegistryImpl(config: Config) extends RecorderRegistry { - private val instrumentFactory = InstrumentFactory(config.getConfig("kamon.metric.instrument-factory")) +class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry { + private val instrumentFactory = new AtomicReference[InstrumentFactory]() private val entities = TrieMap.empty[Entity, EntityRecorder with EntitySnapshotProducer] + reconfigure(initialConfig) + override def getRecorder(entity: Entity): EntityRecorder = { - entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory)) + entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get())) } override def getRecorder(name: String, category: String, tags: Map[String, String]): EntityRecorder = ??? @@ -29,6 +33,10 @@ class RecorderRegistryImpl(config: Config) extends RecorderRegistry { override def removeRecorder(name: String, category: String, tags: Map[String, String]): Boolean = ??? + private[kamon] def reconfigure(config: Config): Unit = { + instrumentFactory.set(InstrumentFactory(config.getConfig("kamon.metric.instrument-factory"))) + } + private[kamon] def snapshot(): Seq[EntitySnapshot] = { entities.values.map(_.snapshot()).toSeq } -- cgit v1.2.3