diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala | 61 |
1 files changed, 33 insertions, 28 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index 88352e21..87911352 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -16,25 +16,17 @@ package kamon.metric -import akka.actor +import com.typesafe.config.Config import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe } -import kamon.Kamon -import kamon.metric.instrument.{ InstrumentFactory, CollectionContext } -import kamon.supervisor.ModuleSupervisor +import kamon.metric.instrument.{ DefaultRefreshScheduler, InstrumentFactory, CollectionContext } import scala.collection.concurrent.TrieMap import akka.actor._ -import kamon.util.{ FastDispatch, TriemapAtomicGetOrElseUpdate } - -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - override def get(system: ActorSystem): MetricsExtension = super.get(system) - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtensionImpl(system) -} +import kamon.util.{ LazyActorRef, TriemapAtomicGetOrElseUpdate } case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) -trait MetricsExtension extends Kamon.Extension { +trait MetricsExtension { def settings: MetricsExtensionSettings def shouldTrack(entity: Entity): Boolean def shouldTrack(entityName: String, category: String): Boolean = @@ -63,16 +55,11 @@ trait MetricsExtension extends Kamon.Extension { def instrumentFactory(category: String): InstrumentFactory } -class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension { - import FastDispatch.Syntax - - val settings = MetricsExtensionSettings(system) - +private[kamon] class MetricsExtensionImpl(config: Config) extends MetricsExtension { private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] - private val _collectionContext = buildDefaultCollectionContext - private val _metricsCollectionDispatcher = system.dispatchers.lookup(settings.metricCollectionDispatcher) - private lazy val _subscriptions = ModuleSupervisor.get(system).createModule("subscriptions-dispatcher", - SubscriptionsDispatcher.props(settings.tickInterval, collectSnapshots).withDispatcher(settings.metricCollectionDispatcher)) + private val _subscriptions = new LazyActorRef + + val settings = MetricsExtensionSettings(config) def shouldTrack(entity: Entity): Boolean = settings.entityFilters.get(entity.category).map { @@ -110,10 +97,10 @@ class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension find(Entity(name, category)) def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = - _subscriptions.fastDispatch(Subscribe(filter, subscriber, permanent))(_metricsCollectionDispatcher) + _subscriptions.tell(Subscribe(filter, subscriber, permanent)) def unsubscribe(subscriber: ActorRef): Unit = - _subscriptions.fastDispatch(Unsubscribe(subscriber))(_metricsCollectionDispatcher) + _subscriptions.tell(Unsubscribe(subscriber)) def buildDefaultCollectionContext: CollectionContext = CollectionContext(settings.defaultCollectionContextBufferSize) @@ -121,16 +108,34 @@ class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension def instrumentFactory(category: String): InstrumentFactory = settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory) - /** - * Collect and dispatch. - */ - private def collectSnapshots(): Map[Entity, EntitySnapshot] = { + private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = { val builder = Map.newBuilder[Entity, EntitySnapshot] _trackedEntities.foreach { - case (identity, recorder) ⇒ builder += ((identity, recorder.collect(_collectionContext))) + case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) } builder.result() } + + /** + * Metrics Extension initialization. + */ + private var _system: ActorSystem = null + private lazy val _start = { + _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics")) + settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher)) + } + + def start(system: ActorSystem): Unit = synchronized { + _system = system + _start + _system = null + } +} + +private[kamon] object MetricsExtensionImpl { + + def apply(config: Config) = + new MetricsExtensionImpl(config) } |