aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-02-12 11:30:06 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-02-13 05:15:30 +0100
commitc6bb65535bcc3cc1ff3834a91473ee8dfa6145e8 (patch)
treed7dbe6a1007b168998f167ac74a98744542c6fa8 /kamon-core/src/main/scala/kamon/metric
parent6729c9632245328a007332cdcce7d362584d735a (diff)
downloadKamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.tar.gz
Kamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.tar.bz2
Kamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.zip
! all: Kamon now works as a single instance in a companion object.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala61
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala35
-rw-r--r--kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala9
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metric/UserMetrics.scala33
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala19
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala99
7 files changed, 170 insertions, 88 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index 88352e21..87911352 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -16,25 +16,17 @@
package kamon.metric
-import akka.actor
+import com.typesafe.config.Config
import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe }
-import kamon.Kamon
-import kamon.metric.instrument.{ InstrumentFactory, CollectionContext }
-import kamon.supervisor.ModuleSupervisor
+import kamon.metric.instrument.{ DefaultRefreshScheduler, InstrumentFactory, CollectionContext }
import scala.collection.concurrent.TrieMap
import akka.actor._
-import kamon.util.{ FastDispatch, TriemapAtomicGetOrElseUpdate }
-
-object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
- override def get(system: ActorSystem): MetricsExtension = super.get(system)
- def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
- def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtensionImpl(system)
-}
+import kamon.util.{ LazyActorRef, TriemapAtomicGetOrElseUpdate }
case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T)
-trait MetricsExtension extends Kamon.Extension {
+trait MetricsExtension {
def settings: MetricsExtensionSettings
def shouldTrack(entity: Entity): Boolean
def shouldTrack(entityName: String, category: String): Boolean =
@@ -63,16 +55,11 @@ trait MetricsExtension extends Kamon.Extension {
def instrumentFactory(category: String): InstrumentFactory
}
-class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension {
- import FastDispatch.Syntax
-
- val settings = MetricsExtensionSettings(system)
-
+private[kamon] class MetricsExtensionImpl(config: Config) extends MetricsExtension {
private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder]
- private val _collectionContext = buildDefaultCollectionContext
- private val _metricsCollectionDispatcher = system.dispatchers.lookup(settings.metricCollectionDispatcher)
- private lazy val _subscriptions = ModuleSupervisor.get(system).createModule("subscriptions-dispatcher",
- SubscriptionsDispatcher.props(settings.tickInterval, collectSnapshots).withDispatcher(settings.metricCollectionDispatcher))
+ private val _subscriptions = new LazyActorRef
+
+ val settings = MetricsExtensionSettings(config)
def shouldTrack(entity: Entity): Boolean =
settings.entityFilters.get(entity.category).map {
@@ -110,10 +97,10 @@ class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension
find(Entity(name, category))
def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit =
- _subscriptions.fastDispatch(Subscribe(filter, subscriber, permanent))(_metricsCollectionDispatcher)
+ _subscriptions.tell(Subscribe(filter, subscriber, permanent))
def unsubscribe(subscriber: ActorRef): Unit =
- _subscriptions.fastDispatch(Unsubscribe(subscriber))(_metricsCollectionDispatcher)
+ _subscriptions.tell(Unsubscribe(subscriber))
def buildDefaultCollectionContext: CollectionContext =
CollectionContext(settings.defaultCollectionContextBufferSize)
@@ -121,16 +108,34 @@ class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension
def instrumentFactory(category: String): InstrumentFactory =
settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory)
- /**
- * Collect and dispatch.
- */
- private def collectSnapshots(): Map[Entity, EntitySnapshot] = {
+ private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = {
val builder = Map.newBuilder[Entity, EntitySnapshot]
_trackedEntities.foreach {
- case (identity, recorder) ⇒ builder += ((identity, recorder.collect(_collectionContext)))
+ case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext)))
}
builder.result()
}
+
+ /**
+ * Metrics Extension initialization.
+ */
+ private var _system: ActorSystem = null
+ private lazy val _start = {
+ _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics"))
+ settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher))
+ }
+
+ def start(system: ActorSystem): Unit = synchronized {
+ _system = system
+ _start
+ _system = null
+ }
+}
+
+private[kamon] object MetricsExtensionImpl {
+
+ def apply(config: Config) =
+ new MetricsExtensionImpl(config)
}
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
index 84624336..9881ed00 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
@@ -16,9 +16,8 @@
package kamon.metric
-import akka.actor.ExtendedActorSystem
import com.typesafe.config.Config
-import kamon.metric.instrument.{ RefreshScheduler, InstrumentFactory, DefaultInstrumentSettings, InstrumentCustomSettings }
+import kamon.metric.instrument._
import kamon.util.GlobPathFilter
import scala.concurrent.duration.FiniteDuration
@@ -27,15 +26,19 @@ import scala.concurrent.duration.FiniteDuration
* Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key.
*/
case class MetricsExtensionSettings(
- tickInterval: FiniteDuration,
- defaultCollectionContextBufferSize: Int,
- trackUnmatchedEntities: Boolean,
- entityFilters: Map[String, EntityFilter],
- instrumentFactories: Map[String, InstrumentFactory],
- defaultInstrumentFactory: InstrumentFactory,
- metricCollectionDispatcher: String,
- refreshSchedulerDispatcher: String,
- refreshScheduler: RefreshScheduler)
+ tickInterval: FiniteDuration,
+ defaultCollectionContextBufferSize: Int,
+ trackUnmatchedEntities: Boolean,
+ entityFilters: Map[String, EntityFilter],
+ instrumentFactories: Map[String, InstrumentFactory],
+ defaultInstrumentFactory: InstrumentFactory,
+ refreshScheduler: RefreshScheduler) {
+
+ private[kamon] def pointScheduler(targetScheduler: RefreshScheduler): Unit = refreshScheduler match {
+ case lrs: LazyRefreshScheduler ⇒ lrs.point(targetScheduler)
+ case others ⇒
+ }
+}
/**
*
@@ -49,23 +52,21 @@ object MetricsExtensionSettings {
import kamon.util.ConfigTools.Syntax
import scala.concurrent.duration._
- def apply(system: ExtendedActorSystem): MetricsExtensionSettings = {
- val metricConfig = system.settings.config.getConfig("kamon.metric")
+ def apply(config: Config): MetricsExtensionSettings = {
+ val metricConfig = config.getConfig("kamon.metric")
val tickInterval = metricConfig.getFiniteDuration("tick-interval")
val collectBufferSize = metricConfig.getInt("default-collection-context-buffer-size")
val trackUnmatchedEntities = metricConfig.getBoolean("track-unmatched-entities")
val entityFilters = loadFilters(metricConfig.getConfig("filters"))
val defaultInstrumentSettings = DefaultInstrumentSettings.fromConfig(metricConfig.getConfig("default-instrument-settings"))
- val metricCollectionDispatcher = metricConfig.getString("dispatchers.metric-collection")
- val refreshSchedulerDispatcher = metricConfig.getString("dispatchers.refresh-scheduler")
- val refreshScheduler = RefreshScheduler(system.scheduler, system.dispatchers.lookup(refreshSchedulerDispatcher))
+ val refreshScheduler = new LazyRefreshScheduler
val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler)
val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler)
MetricsExtensionSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories,
- defaultInstrumentFactory, metricCollectionDispatcher, refreshSchedulerDispatcher, refreshScheduler)
+ defaultInstrumentFactory, refreshScheduler)
}
/**
diff --git a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
index f616be35..68b545a5 100644
--- a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
+++ b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
@@ -24,11 +24,12 @@ import scala.concurrent.duration.FiniteDuration
/**
* Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers.
*/
-private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]) extends Actor {
+private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl) extends Actor {
var lastTick = MilliTimestamp.now
var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher)
+ val collectionContext = metricsExtension.buildDefaultCollectionContext
def receive = {
case Tick ⇒ processTick()
@@ -38,7 +39,7 @@ private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector
}
def processTick(): Unit =
- dispatch(collector())
+ dispatch(metricsExtension.collectSnapshots(collectionContext))
def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = {
def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] =
@@ -80,8 +81,8 @@ private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector
}
object SubscriptionsDispatcher {
- def props(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]): Props =
- Props(new SubscriptionsDispatcher(interval, collector))
+ def props(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl): Props =
+ Props(new SubscriptionsDispatcher(interval, metricsExtension))
case object Tick
case class Unsubscribe(subscriber: ActorRef)
diff --git a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
index 358dade8..dfc5d5f0 100644
--- a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
@@ -29,7 +29,7 @@ class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef
import MapMerge.Syntax
val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
- val collectionContext: CollectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext
+ val collectionContext: CollectionContext = Kamon.metrics.buildDefaultCollectionContext
def receive = empty
diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
index e6ec5e99..e0818292 100644
--- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
@@ -16,30 +16,13 @@
package kamon.metric
-import akka.actor
-import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
-import kamon.Kamon
import kamon.metric.instrument.Gauge.CurrentValueCollector
import kamon.metric.instrument.Histogram.DynamicRange
import kamon.metric.instrument._
import scala.concurrent.duration.FiniteDuration
-object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider {
- override def get(system: ActorSystem): UserMetricsExtension = super.get(system)
- def lookup(): ExtensionId[_ <: actor.Extension] = UserMetrics
- def createExtension(system: ExtendedActorSystem): UserMetricsExtension = {
- val metricsExtension = Metrics.get(system)
- val instrumentFactory = metricsExtension.instrumentFactory(entity.category)
- val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory)
-
- metricsExtension.register(entity, userMetricsExtension).recorder
- }
-
- val entity = Entity("user-metric", "user-metric")
-}
-
-trait UserMetricsExtension extends Kamon.Extension {
+trait UserMetricsExtension {
def histogram(name: String): Histogram
def histogram(name: String, dynamicRange: DynamicRange): Histogram
def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram
@@ -86,7 +69,7 @@ trait UserMetricsExtension extends Kamon.Extension {
}
-class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension {
+private[kamon] class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension {
override def histogram(name: String): Histogram =
super.histogram(name)
@@ -206,4 +189,16 @@ class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends Gen
override def removeCounter(key: CounterKey): Unit =
super.removeCounter(key)
+}
+
+private[kamon] object UserMetricsExtensionImpl {
+ val UserMetricEntity = Entity("user-metric", "user-metric")
+
+ def apply(metricsExtension: MetricsExtension): UserMetricsExtensionImpl = {
+ val instrumentFactory = metricsExtension.instrumentFactory(UserMetricEntity.category)
+ val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory)
+
+ metricsExtension.register(UserMetricEntity, userMetricsExtension).recorder
+ }
+
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
index 68d5c876..59b4b443 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
@@ -51,22 +51,3 @@ object CollectionContext {
}
}
-trait RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
-}
-
-object RefreshScheduler {
- val NoopScheduler = new RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = new Cancellable {
- override def isCancelled: Boolean = true
- override def cancel(): Boolean = true
- }
- }
-
- def apply(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = new RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
- scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
- }
-
- def create(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = apply(scheduler, dispatcher)
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
new file mode 100644
index 00000000..adb08713
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
@@ -0,0 +1,99 @@
+package kamon.metric.instrument
+
+import akka.actor.{ Scheduler, Cancellable }
+import org.HdrHistogram.WriterReaderPhaser
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+
+trait RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
+}
+
+/**
+ * Default implementation of RefreshScheduler that simply uses an [[akka.actor.Scheduler]] to schedule tasks to be run
+ * in the provided ExecutionContext.
+ */
+class DefaultRefreshScheduler(scheduler: Scheduler, dispatcher: ExecutionContext) extends RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
+ scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
+}
+
+object DefaultRefreshScheduler {
+ def apply(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ new DefaultRefreshScheduler(scheduler, dispatcher)
+
+ def create(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ apply(scheduler, dispatcher)
+}
+
+/**
+ * RefreshScheduler implementation that accumulates all the scheduled actions until it is pointed to another refresh
+ * scheduler. Once it is pointed, all subsequent calls to `schedule` will immediately be scheduled in the pointed
+ * scheduler.
+ */
+class LazyRefreshScheduler extends RefreshScheduler {
+ private val _schedulerPhaser = new WriterReaderPhaser
+ private val _backlog = new TrieMap[(FiniteDuration, () ⇒ Unit), RepointableCancellable]()
+ @volatile private var _target: Option[RefreshScheduler] = None
+
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = {
+ val criticalEnter = _schedulerPhaser.writerCriticalSectionEnter()
+ try {
+ _target.map { scheduler ⇒
+ scheduler.schedule(interval, refresh)
+
+ } getOrElse {
+ val entry = (interval, refresh)
+ val cancellable = new RepointableCancellable(entry)
+
+ _backlog.put(entry, cancellable)
+ cancellable
+ }
+
+ } finally {
+ _schedulerPhaser.writerCriticalSectionExit(criticalEnter)
+ }
+ }
+
+ def point(target: RefreshScheduler): Unit = try {
+ _schedulerPhaser.readerLock()
+
+ if (_target.isEmpty) {
+ _target = Some(target)
+ _schedulerPhaser.flipPhase(10000L)
+ _backlog.dropWhile {
+ case ((interval, refresh), repointableCancellable) ⇒
+ repointableCancellable.point(target.schedule(interval, refresh))
+ true
+ }
+ } else sys.error("A LazyRefreshScheduler cannot be pointed more than once.")
+ } finally { _schedulerPhaser.readerUnlock() }
+
+ class RepointableCancellable(entry: (FiniteDuration, () ⇒ Unit)) extends Cancellable {
+ private var _isCancelled = false
+ private var _cancellable: Option[Cancellable] = None
+
+ def isCancelled: Boolean = synchronized {
+ _cancellable.map(_.isCancelled).getOrElse(_isCancelled)
+ }
+
+ def cancel(): Boolean = synchronized {
+ _isCancelled = true
+ _cancellable.map(_.cancel()).getOrElse(_backlog.remove(entry).nonEmpty)
+ }
+
+ def point(cancellable: Cancellable): Unit = synchronized {
+ if (_cancellable.isEmpty) {
+ _cancellable = Some(cancellable)
+
+ if (_isCancelled)
+ cancellable.cancel()
+
+ } else sys.error("A RepointableCancellable cannot be pointed more than once.")
+
+ }
+ }
+}
+