From becf6cac7142011cc478ab7ab15d51799b190951 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sat, 24 Dec 2016 03:14:29 +0100 Subject: allow restarting Kamon in the same process. taking some ideas from #395, this commit removes ConfigProviders and allows Kamon to be used in "permissive" mode until it gets started. --- kamon-core/src/main/resources/reference.conf | 3 - kamon-core/src/main/scala/kamon/Kamon.scala | 151 ++++++++++----------- .../main/scala/kamon/metric/MetricsModule.scala | 5 +- .../src/main/scala/kamon/trace/TraceSettings.scala | 6 +- .../src/main/scala/kamon/trace/TracerModule.scala | 9 +- .../src/test/scala/kamon/KamonLifecycleSpec.scala | 93 +++++++++++++ .../test/scala/kamon/testkit/BaseKamonSpec.scala | 4 +- 7 files changed, 175 insertions(+), 96 deletions(-) create mode 100644 kamon-core/src/test/scala/kamon/KamonLifecycleSpec.scala diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index fb12b213..69e8c54f 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -141,9 +141,6 @@ kamon { } } - # FQCN for a implementation of - config-provider = default - # All settings included under the internal-config key will be used to repleace the akka.* and spray.* settings. By # doing this we avoid applying custom settings that might make sense for the user application to the internal actor # system and Spray facilities used by Kamon. diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index a3a59b05..7012ed3f 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -24,105 +24,92 @@ import kamon.util.logger.LazyLogger import _root_.scala.util.control.NonFatal import _root_.scala.util.{Failure, Success, Try} -trait ConfigProvider { - def config: Config - - final def patchedConfig: Config = { - val internalConfig = config.getConfig("kamon.internal-config") - config - .withoutPath("akka") - .withoutPath("spray") - .withFallback(internalConfig) - } -} - object Kamon { - - private val log = LazyLogger("Kamon") - trait Extension extends actor.Extension - def defaultConfig = ConfigFactory.load(this.getClass.getClassLoader, ConfigParseOptions.defaults(), ConfigResolveOptions.defaults().setAllowUnresolved(true)) + @volatile private var kamonInstance = new Instance() - class KamonDefaultConfigProvider extends ConfigProvider { - def config = resolveConfiguration + def config = kamonInstance.config + def metrics = kamonInstance.metrics + def tracer = kamonInstance.tracer - private def resolveConfiguration: Config = { - val defaultConf = defaultConfig + def start(): Unit = synchronized { + kamonInstance.start() + } - defaultConf.getString("kamon.config-provider") match { - case "default" ⇒ defaultConf - case fqcn ⇒ - val dynamic = new ReflectiveDynamicAccess(getClass.getClassLoader) - dynamic.createInstanceFor[ConfigProvider](fqcn, Nil).get.config - } - } + def start(conf: Config): Unit = synchronized { + kamonInstance.start(conf) } - class KamonConfigProvider(_config: Config) extends ConfigProvider { - def config = _config + def shutdown(): Unit = synchronized { + kamonInstance.shutdown() + kamonInstance = new Instance() } - private[kamon] var configProvider: Option[ConfigProvider] = None - def config: Config = - configProvider match { - case Some(provider) ⇒ provider.config - case None ⇒ throw new Exception("Kamon.start() not called yet") + private class Instance() { + private val log = LazyLogger(classOf[Instance]) + private var actorSystem: ActorSystem = _ + + var started = false + var config: Config = defaultConfig + val metrics = MetricsModuleImpl(config) + val tracer = TracerModuleImpl(metrics, config) + + private lazy val _start = { + log.info("Initializing Kamon...") + tryLoadAutoweaveModule() + actorSystem = ActorSystem("kamon", config) + metrics.start(actorSystem, config) + tracer.start(actorSystem, config) + actorSystem.registerExtension(ModuleLoader) + started = true } - lazy val metrics = MetricsModuleImpl(config) - lazy val tracer = TracerModuleImpl(metrics, config) - - private lazy val _system = { - val patchedConfig = - configProvider match { - case Some(provider) ⇒ provider.patchedConfig - case None ⇒ - throw new Exception("Kamon.start() not called yet") - } - - log.info("Initializing Kamon...") - - tryLoadAutoweaveModule() - - ActorSystem("kamon", patchedConfig) - } - private lazy val _start = { - metrics.start(_system) - tracer.start(_system) - _system.registerExtension(ModuleLoader) - } + def start(): Unit = { + _start + } - def start(): Unit = { - configProvider = Some(new KamonDefaultConfigProvider()) - _start - } + def start(conf: Config): Unit = { + config = patchConfiguration(conf) + _start + } - def start(conf: Config): Unit = { - configProvider = Some(new KamonConfigProvider(conf)) - _start - } + def shutdown(): Unit = { + if (started) { + actorSystem.terminate() + } + } - def start(provider: ConfigProvider): Unit = { - configProvider = Some(provider) - _start - } + private def defaultConfig = { + patchConfiguration( + ConfigFactory.load( + this.getClass.getClassLoader, + ConfigParseOptions.defaults(), + ConfigResolveOptions.defaults().setAllowUnresolved(true) + ) + ) + } - def shutdown(): Unit = { - _system.shutdown() - } + private def patchConfiguration(config: Config): Config = { + val internalConfig = config.getConfig("kamon.internal-config") + config + .withoutPath("akka") + .withoutPath("spray") + .withFallback(internalConfig) + } - private def tryLoadAutoweaveModule(): Unit = { - Try { - val autoweave = Class.forName("kamon.autoweave.Autoweave") - autoweave.getDeclaredMethod("attach").invoke(autoweave.newInstance()) - } match { - case Success(_) ⇒ - val color = (msg: String) ⇒ s"""\u001B[32m${msg}\u001B[0m""" - log.info(color("Kamon-autoweave has been successfully loaded.")) - log.info(color("The AspectJ load time weaving agent is now attached to the JVM (you don't need to use -javaagent).")) - log.info(color("This offers extra flexibility but obviously any classes loaded before attachment will not be woven.")) - case Failure(NonFatal(reason)) ⇒ log.debug(s"Kamon-autoweave failed to load. Reason: ${reason.getMessage}.") + private def tryLoadAutoweaveModule(): Unit = { + Try { + val autoweave = Class.forName("kamon.autoweave.Autoweave") + autoweave.getDeclaredMethod("attach").invoke(autoweave.newInstance()) + } match { + case Success(_) ⇒ + val color = (msg: String) ⇒ s"""\u001B[32m${msg}\u001B[0m""" + log.info(color("Kamon-autoweave has been successfully loaded.")) + log.info(color("The AspectJ load time weaving agent is now attached to the JVM (you don't need to use -javaagent).")) + log.info(color("This offers extra flexibility but obviously any classes loaded before attachment will not be woven.")) + case Failure(NonFatal(reason)) ⇒ log.debug(s"Kamon-autoweave failed to load. Reason: ${reason.getMessage}.") + } } } } diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsModule.scala b/kamon-core/src/main/scala/kamon/metric/MetricsModule.scala index 75ef0851..864b7a0b 100755 --- a/kamon-core/src/main/scala/kamon/metric/MetricsModule.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsModule.scala @@ -234,7 +234,7 @@ private[kamon] class MetricsModuleImpl(config: Config) extends MetricsModule { private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] private val _subscriptions = new LazyActorRef - val settings = MetricsSettings(config) + @volatile var settings = MetricsSettings(config) def shouldTrack(entity: Entity): Boolean = settings.entityFilters.get(entity.category).map { @@ -360,7 +360,8 @@ private[kamon] class MetricsModuleImpl(config: Config) extends MetricsModule { settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher)) } - def start(system: ActorSystem): Unit = synchronized { + def start(system: ActorSystem, newConfig: Config): Unit = synchronized { + settings = MetricsSettings(newConfig) _system = system _start _system = null diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSettings.scala b/kamon-core/src/main/scala/kamon/trace/TraceSettings.scala index 06677314..c3a83e93 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceSettings.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceSettings.scala @@ -20,7 +20,7 @@ import kamon.util.ConfigTools.Syntax import com.typesafe.config.Config import kamon.util.NanoInterval -case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler, tokenGeneratorFQN: String) +case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler, tokenGenerator: () ⇒ String) object TraceSettings { def apply(config: Config): TraceSettings = { @@ -42,8 +42,10 @@ object TraceSettings { case "clock" ⇒ new ClockSampler(new NanoInterval(tracerConfig.getFiniteDuration("clock-sampler.pause").toNanos)) } + val dynamic = new akka.actor.ReflectiveDynamicAccess(getClass.getClassLoader) val tokenGeneratorFQN = tracerConfig.getString("token-generator") + val tokenGenerator = dynamic.createInstanceFor[() ⇒ String](tokenGeneratorFQN, Nil).get // let's bubble up any problems. - TraceSettings(detailLevel, sampler, tokenGeneratorFQN) + TraceSettings(detailLevel, sampler, tokenGenerator) } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala index b7b6c17a..552962eb 100644 --- a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala +++ b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala @@ -117,14 +117,12 @@ object Tracer { } private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: Config) extends TracerModule { - private val _settings = TraceSettings(config) + @volatile private var _settings = TraceSettings(config) private val _subscriptions = new LazyActorRef private val _incubator = new LazyActorRef - private val _dynamic = new akka.actor.ReflectiveDynamicAccess(getClass.getClassLoader) - private val _tokenGenerator = _dynamic.createInstanceFor[() ⇒ String](_settings.tokenGeneratorFQN, Nil).get // let's bubble up any problems. - private def newToken: String = _tokenGenerator() + private def newToken: String = _settings.tokenGenerator() def newContext(name: String): TraceContext = createTraceContext(name, None) @@ -180,7 +178,8 @@ private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: C _incubator.point(_system.actorOf(Incubator.props(subscriptions))) } - def start(system: ActorSystem): Unit = synchronized { + def start(system: ActorSystem, newConfig: Config): Unit = synchronized { + _settings = TraceSettings(newConfig) _system = system _logger = Logging(_system, "TracerModule") _start diff --git a/kamon-core/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core/src/test/scala/kamon/KamonLifecycleSpec.scala new file mode 100644 index 00000000..2fbd1b71 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/KamonLifecycleSpec.scala @@ -0,0 +1,93 @@ +package kamon + +import akka.actor.ActorSystem +import akka.testkit.{TestKitBase, TestProbe} +import com.typesafe.config.ConfigFactory +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.{EntitySnapshot, SubscriptionsDispatcher} +import kamon.util.LazyActorRef +import org.scalatest.{Matchers, WordSpecLike} +import org.scalactic.TimesOnInt._ + +import scala.concurrent.duration._ + +class KamonLifecycleSpec extends TestKitBase with WordSpecLike with Matchers { + override implicit lazy val system: ActorSystem = ActorSystem("kamon-lifecycle-spec") + + "The Kamon lifecycle" should { + "allow Kamon to be used before it gets started" in { + val someMetric = Kamon.metrics.histogram("allow-me-before-start") + } + + "allow Kamon to be started/shutdown several times" in { + 10 times { + Kamon.shutdown() + Kamon.start() + Kamon.start() + Kamon.shutdown() + Kamon.shutdown() + } + } + + "not dispatch subscriptions before Kamon startup" in { + val subscriber = TestProbe() + Kamon.metrics.histogram("only-after-startup").record(100) + Kamon.metrics.subscribe("**", "**", subscriber.ref, permanently = true) + + flushSubscriptions() + subscriber.expectNoMsg(300 millis) + + Kamon.metrics.histogram("only-after-startup").record(100) + Kamon.start() + flushSubscriptions() + subscriber.expectMsgType[TickMetricSnapshot] + Kamon.shutdown() + } + + "not dispatch subscriptions after Kamon shutdown" in { + val subscriber = TestProbe() + Kamon.start() + Kamon.metrics.histogram("only-before-shutdown").record(100) + Kamon.metrics.subscribe("**", "**", subscriber.ref, permanently = true) + + flushSubscriptions() + subscriber.expectMsgType[TickMetricSnapshot] + + Kamon.metrics.histogram("only-before-shutdown").record(100) + Kamon.shutdown() + Thread.sleep(500) + flushSubscriptions() + subscriber.expectNoMsg(300 millis) + } + + "reconfigure filters after being started" in { + val customConfig = ConfigFactory.parseString( + """ + |kamon.metric.filters.histogram { + | includes = [ "**" ] + | excludes = ["untracked-histogram"] + |} + """.stripMargin + ) + + Kamon.metrics.shouldTrack("untracked-histogram", "histogram") shouldBe true + Kamon.start(customConfig.withFallback(ConfigFactory.load())) + Kamon.metrics.shouldTrack("untracked-histogram", "histogram") shouldBe false + + } + } + + def takeSnapshotOf(name: String, category: String): EntitySnapshot = { + val collectionContext = Kamon.metrics.buildDefaultCollectionContext + val recorder = Kamon.metrics.find(name, category).get + recorder.collect(collectionContext) + } + + def flushSubscriptions(): Unit = { + val subscriptionsField = Kamon.metrics.getClass.getDeclaredField("_subscriptions") + subscriptionsField.setAccessible(true) + val subscriptions = subscriptionsField.get(Kamon.metrics).asInstanceOf[LazyActorRef] + + subscriptions.tell(SubscriptionsDispatcher.Tick) + } +} diff --git a/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala index a869149b..49bcee68 100644 --- a/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala +++ b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala @@ -32,9 +32,9 @@ abstract class BaseKamonSpec(actorSystemName: String) extends TestKitBase with W ActorSystem(actorSystemName, mergedConfig) } - def config: Config = Kamon.defaultConfig + def config: Config = Kamon.config - def mergedConfig: Config = config.withFallback(Kamon.defaultConfig) + def mergedConfig: Config = config.withFallback(Kamon.config) def newContext(name: String): TraceContext = Kamon.tracer.newContext(name) -- cgit v1.2.3