aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-05-03 12:11:47 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-05-03 12:11:47 +0200
commit5dee54a0794b282e9b5729a3d4b85478c12a68d1 (patch)
tree078513a1cddcb66990a11427b2ebdcb2da46f87b /kamon-core/src
parent4247aa319ac6e17b7ef7a76d61bac32c872575e3 (diff)
downloadKamon-5dee54a0794b282e9b5729a3d4b85478c12a68d1.tar.gz
Kamon-5dee54a0794b282e9b5729a3d4b85478c12a68d1.tar.bz2
Kamon-5dee54a0794b282e9b5729a3d4b85478c12a68d1.zip
handle reporters shutdown and reconfigures
Diffstat (limited to 'kamon-core/src')
-rw-r--r--kamon-core/src/main/resources/reference.conf2
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala27
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala209
-rw-r--r--kamon-core/src/main/scala/kamon/Reporters.scala160
-rw-r--r--kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala14
5 files changed, 234 insertions, 178 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 9de2247b..61072507 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -1,6 +1,6 @@
kamon {
metric {
-
+ tick-interval = 1 second
instrument-factory {
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 72573a8a..316a9a24 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -12,26 +12,25 @@ import kamon.trace.Tracer
*
*
*/
-trait Kamon {
- def metrics: RecorderRegistry
- def tracer: Tracer
-
- def subscriptions: Reporters
- def util: Util
+object Kamon {
+ private val recorderRegistry = new RecorderRegistryImpl(ConfigFactory.load())
+ private val reporterRegistry = new ReporterRegistryImpl(recorderRegistry, ConfigFactory.load())
- def environment: Environment
- def diagnose: Diagnostic
+ def metrics: RecorderRegistry = recorderRegistry
+ def reporters: ReporterRegistry = reporterRegistry
- def reconfigure(config: Config): Unit
+ def reconfigure(config: Config): Unit = synchronized {
+ recorderRegistry.reconfigure(config)
+ reporterRegistry.reconfigure(config)
+ }
-}
+ def tracer: Tracer = ???
+ def environment: Environment = ???
+ def diagnose: Diagnostic = ???
+ def util: Util = ???
-object Kamon {
- val metricsModule = new RecorderRegistryImpl(ConfigFactory.load())
- val reports = new ReportersRegistry(metricsModule)
- def metrics: RecorderRegistry = metricsModule
}
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
new file mode 100644
index 00000000..09c980f6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -0,0 +1,209 @@
+package kamon
+
+import java.time.Instant
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+import java.util.concurrent._
+
+import com.typesafe.config.Config
+import kamon.metric._
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
+import scala.util.Try
+import scala.util.control.NonFatal
+
+trait ReporterRegistry {
+ def loadFromConfig(): Unit
+ def add(reporter: MetricsReporter): Registration
+ def add(reporter: MetricsReporter, name: String): Registration
+ def stopAll(): Future[Unit]
+}
+
+class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) extends ReporterRegistry {
+ private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry"))
+ private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+ private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]()
+ private val reporterCounter = new AtomicLong(0L)
+
+ reconfigure(initialConfig)
+
+ override def loadFromConfig(): Unit = ???
+
+ override def add(reporter: MetricsReporter): Registration =
+ add(reporter, reporter.getClass.getName())
+
+ override def add(reporter: MetricsReporter, name: String): Registration = {
+ val executor = Executors.newSingleThreadExecutor(threadFactory(name))
+ val reporterEntry = ReporterEntry(
+ id = reporterCounter.getAndIncrement(),
+ reporter = reporter,
+ executionContext = ExecutionContext.fromExecutorService(executor)
+ )
+
+ metricReporters.add(reporterEntry)
+
+ new Registration {
+ val reporterID = reporterEntry.id
+ override def cancel(): Boolean = {
+ metricReporters.removeIf(entry => {
+ if(entry.id == reporterID) {
+ stopReporter(entry)
+ true
+ } else false
+ })
+ }
+ }
+ }
+
+ override def stopAll(): Future[Unit] = {
+ implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
+ val reporterStopFutures = Vector.newBuilder[Future[Unit]]
+ while(!metricReporters.isEmpty) {
+ val entry = metricReporters.poll()
+ if(entry != null) {
+ reporterStopFutures += stopReporter(entry)
+ }
+ }
+
+ Future.sequence(reporterStopFutures.result()).transform(_ => Try((): Unit))
+ }
+
+ private[kamon] def reconfigure(config: Config): Unit = synchronized {
+ val tickInterval = config.getDuration("kamon.metric.tick-interval")
+ val currentTicker = metricsTickerSchedule.get()
+ if(currentTicker != null) {
+ currentTicker.cancel(true)
+ }
+
+ // Reconfigure all registered reporters
+ metricReporters.forEach(entry =>
+ Future(entry.reporter.reconfigure(config))(entry.executionContext)
+ )
+
+ metricsTickerSchedule.set {
+ registryExecutionContext.scheduleAtFixedRate(
+ new MetricTicker(metrics, metricReporters), tickInterval.toMillis, tickInterval.toMillis, TimeUnit.MILLISECONDS
+ )
+ }
+ }
+
+ private def stopReporter(entry: ReporterEntry): Future[Unit] = {
+ entry.isActive = false
+
+ Future(entry.reporter.stop())(entry.executionContext).andThen {
+ case _ => entry.executionContext.shutdown()
+ }(ExecutionContext.fromExecutor(registryExecutionContext))
+ }
+
+ /**
+ * Creates a thread factory that assigns the specified name to all created Threads.
+ */
+ private def threadFactory(name: String): ThreadFactory =
+ new ThreadFactory {
+ val defaultFactory = Executors.defaultThreadFactory()
+
+ override def newThread(r: Runnable): Thread = {
+ val thread = defaultFactory.newThread(r)
+ thread.setName(name)
+ thread
+ }
+ }
+
+
+ private case class ReporterEntry(
+ @volatile var isActive: Boolean = true,
+ id: Long,
+ reporter: MetricsReporter,
+ executionContext: ExecutionContextExecutorService
+ )
+
+ private class MetricTicker(metricsImpl: RecorderRegistryImpl, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
+ val logger = LoggerFactory.getLogger(classOf[MetricTicker])
+ var lastTick = Instant.now()
+
+ def run(): Unit = try {
+ val currentTick = Instant.now()
+ val tickSnapshot = TickSnapshot(
+ interval = Interval(lastTick, currentTick),
+ entities = metricsImpl.snapshot()
+ )
+
+ reporterEntries.forEach { entry =>
+ Future {
+ if(entry.isActive)
+ entry.reporter.processTick(tickSnapshot)
+
+ }(executor = entry.executionContext)
+ }
+
+ lastTick = currentTick
+
+ } catch {
+ case NonFatal(t) => logger.error("Error while running a tick", t)
+ }
+ }
+}
+
+
+
+trait Registration {
+ def cancel(): Boolean
+}
+
+trait MetricsReporter {
+ def reconfigure(config: Config): Unit
+
+ def start(config: Config): Unit
+ def stop(): Unit
+
+ def processTick(snapshot: TickSnapshot)
+}
+
+
+
+object TestingAllExample extends App {
+ val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty))
+
+ val registration = Kamon.reporters.add(new DummyReporter("test"))
+
+ var x = 0
+ while(true) {
+ recorder.counter("test-other").increment()
+ Thread.sleep(100)
+ x += 1
+
+ if(x == 50) {
+ registration.cancel()
+ }
+
+ if(x == 100) {
+ println("Stopping all reporters")
+ Kamon.reporters.stopAll()
+ }
+ }
+
+}
+
+
+class DummyReporter(name: String) extends MetricsReporter {
+ override def reconfigure(config: Config): Unit = {
+ println("NAME: " + name + "===> Reconfiguring Dummy")
+ }
+
+ override def start(config: Config): Unit = {
+
+ println("NAME: " + name + "===> Starting DUMMY")
+ }
+
+ override def stop(): Unit = {
+ println("NAME: " + name + "===> Stopping Dummy")
+ }
+
+ override def processTick(snapshot: TickSnapshot): Unit = {
+ println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot)
+ println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}")
+ snapshot.entities.foreach { e =>
+ println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", "))
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/Reporters.scala b/kamon-core/src/main/scala/kamon/Reporters.scala
deleted file mode 100644
index 686ababf..00000000
--- a/kamon-core/src/main/scala/kamon/Reporters.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-package kamon
-
-import java.time.Instant
-import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadFactory, TimeUnit}
-
-import com.typesafe.config.Config
-import kamon.metric._
-import org.slf4j.LoggerFactory
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.control.NonFatal
-
-trait Reporters {
- def loadFromConfig(): Unit
- def stop(): Unit
-
- def addReporter(subscriber: MetricsReporter): Cancellable
- def addReporter(subscriber: MetricsReporter, name: String): Cancellable
-
-}
-
-
-class DummyReporter(name: String) extends MetricsReporter {
- override def reconfigure(config: Config): Unit = {
- println("NAME: " + name + "===> Reconfiguring Dummy")
- }
-
- override def start(config: Config): Unit = {
-
- println("NAME: " + name + "===> Starting DUMMY")
- }
-
- override def stop(): Unit = {
- println("NAME: " + name + "===> Stopping Dummy")
- }
-
- override def processTick(snapshot: TickSnapshot): Unit = {
- println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot)
- println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}")
- snapshot.entities.foreach { e =>
- println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", "))
- }
- }
-}
-
-class ReportersRegistry(metrics: RecorderRegistryImpl) extends Reporters {
- private val scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-scheduler"))
- private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]()
- private val mReporters = TrieMap.empty[String, MetricsReporter]
-
-
-
- metricReporters.add(ReporterEntry(new DummyReporter("statsd"), createExecutionContext("statsd-reporter")))
- startMetricsTicker()
-
-
-
-
- override def loadFromConfig(): Unit = ???
- override def stop(): Unit = ???
-
-
- override def addReporter(subscriber: MetricsReporter): Cancellable = ???
-
- override def addReporter(subscriber: MetricsReporter, name: String): Cancellable = {
- ???
- }
-
-
-
- private def createExecutionContext(name: String): ExecutionContext = {
- val threadFactory = new ThreadFactory {
- val defaultFactory = Executors.defaultThreadFactory()
- override def newThread(r: Runnable): Thread = {
- val thread = defaultFactory.newThread(r)
- thread.setName(name)
- thread
- }
- }
-
- ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(threadFactory))
- }
-
- /**
- * Creates a thread factory that assigns the specified name to all created Threads.
- */
- private def threadFactory(name: String): ThreadFactory =
- new ThreadFactory {
- val defaultFactory = Executors.defaultThreadFactory()
-
- override def newThread(r: Runnable): Thread = {
- val thread = defaultFactory.newThread(r)
- thread.setName(name)
- thread
- }
- }
-
-
- def reconfigure(config: Config): Unit = {}
-
-
-
- private case class ReporterEntry(reporter: MetricsReporter, executionContext: ExecutionContext)
-
-
-
- def startMetricsTicker(): Unit = {
- scheduler.scheduleAtFixedRate(new MetricTicker(metrics, metricReporters), 2, 2, TimeUnit.SECONDS)
- }
-
-
- private class MetricTicker(metricsImpl: RecorderRegistryImpl, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
- val logger = LoggerFactory.getLogger(classOf[MetricTicker])
- var lastTick = Instant.now()
-
- def run(): Unit = try {
- val currentTick = Instant.now()
- val tickSnapshot = TickSnapshot(
- interval = Interval(lastTick, currentTick),
- entities = metricsImpl.snapshot()
- )
-
- reporterEntries.forEach { entry =>
- Future(entry.reporter.processTick(tickSnapshot))(executor = entry.executionContext)
- }
-
- lastTick = currentTick
-
- } catch {
- case NonFatal(t) => logger.error("Error while running a tick", t)
- }
- }
-}
-
-
-
-trait Cancellable {
- def cancel(): Unit
-}
-
-trait MetricsReporter {
- def reconfigure(config: Config): Unit
-
- def start(config: Config): Unit
- def stop(): Unit
-
- def processTick(snapshot: TickSnapshot)
-}
-
-
-
-object TestingAllExample extends App {
- val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty))
- while(true) {
- recorder.counter("test-other").increment()
- Thread.sleep(100)
- }
-
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala
index 99974032..8b84ab6a 100644
--- a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala
@@ -1,6 +1,8 @@
package kamon
package metric
+import java.util.concurrent.atomic.AtomicReference
+
import com.typesafe.config.Config
import kamon.metric.instrument.InstrumentFactory
@@ -15,12 +17,14 @@ trait RecorderRegistry {
def removeRecorder(name: String, category: String, tags: Map[String, String]): Boolean
}
-class RecorderRegistryImpl(config: Config) extends RecorderRegistry {
- private val instrumentFactory = InstrumentFactory(config.getConfig("kamon.metric.instrument-factory"))
+class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry {
+ private val instrumentFactory = new AtomicReference[InstrumentFactory]()
private val entities = TrieMap.empty[Entity, EntityRecorder with EntitySnapshotProducer]
+ reconfigure(initialConfig)
+
override def getRecorder(entity: Entity): EntityRecorder = {
- entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory))
+ entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get()))
}
override def getRecorder(name: String, category: String, tags: Map[String, String]): EntityRecorder = ???
@@ -29,6 +33,10 @@ class RecorderRegistryImpl(config: Config) extends RecorderRegistry {
override def removeRecorder(name: String, category: String, tags: Map[String, String]): Boolean = ???
+ private[kamon] def reconfigure(config: Config): Unit = {
+ instrumentFactory.set(InstrumentFactory(config.getConfig("kamon.metric.instrument-factory")))
+ }
+
private[kamon] def snapshot(): Seq[EntitySnapshot] = {
entities.values.map(_.snapshot()).toSeq
}