aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-02-12 11:30:06 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-02-13 05:15:30 +0100
commit07fc83bb01c5873b47aff50d6d3abbb9f11842bd (patch)
tree814c6067a25066978b5cb8bac6541f39d0d4454d /kamon-core/src
parent82a110b23ca57286e4e3dd0315c20ed99b5e8f88 (diff)
downloadKamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.tar.gz
Kamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.tar.bz2
Kamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.zip
! all: Kamon now works as a single instance in a companion object.
Diffstat (limited to 'kamon-core/src')
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala76
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala61
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala35
-rw-r--r--kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala9
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metric/UserMetrics.scala33
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala19
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala99
-rw-r--r--kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala11
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracerExtension.scala62
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracingContext.scala5
-rw-r--r--kamon-core/src/main/scala/kamon/util/LazyActorRef.scala53
-rw-r--r--kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala3
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala5
-rw-r--r--kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala63
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala7
-rw-r--r--kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala3
-rw-r--r--kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala28
-rw-r--r--kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala8
21 files changed, 379 insertions, 215 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 6fedc065..2bed4737 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -16,49 +16,61 @@ package kamon
import _root_.akka.actor
import _root_.akka.actor._
-import com.typesafe.config.Config
+import com.typesafe.config.{ ConfigFactory, Config }
import kamon.metric._
-import kamon.supervisor.ModuleSupervisor
-import kamon.trace.{ Tracer, TracerExtension }
+import kamon.trace.{ TracerExtensionImpl, TracerExtension }
-class Kamon(val actorSystem: ActorSystem) {
- val metrics: MetricsExtension = Metrics.get(actorSystem)
- val tracer: TracerExtension = Tracer.get(actorSystem)
- val userMetrics: UserMetricsExtension = UserMetrics.get(actorSystem)
+object Kamon {
+ trait Extension extends actor.Extension
- // This will cause all auto-start modules to initiate.
- ModuleSupervisor.get(actorSystem)
+ private case class KamonCoreComponents(
+ metrics: MetricsExtension,
+ tracer: TracerExtension,
+ userMetrics: UserMetricsExtension)
- def shutdown(): Unit =
- actorSystem.shutdown()
-}
+ @volatile private var _system: ActorSystem = _
+ @volatile private var _coreComponents: Option[KamonCoreComponents] = None
-object Kamon {
- trait Extension extends actor.Extension
- def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system)
+ def start(config: Config): Unit = synchronized {
+ if (_coreComponents.isEmpty) {
+ val metrics = MetricsExtensionImpl(config)
+ val simpleMetrics = UserMetricsExtensionImpl(metrics)
+ val tracer = TracerExtensionImpl(metrics, config)
- def apply(): Kamon =
- apply("kamon")
+ _coreComponents = Some(KamonCoreComponents(metrics, tracer, simpleMetrics))
+ _system = ActorSystem("kamon", config)
- def apply(actorSystemName: String): Kamon =
- apply(ActorSystem(actorSystemName))
+ metrics.start(_system)
+ tracer.start(_system)
- def apply(actorSystemName: String, config: Config): Kamon =
- apply(ActorSystem(actorSystemName, config))
+ } else sys.error("Kamon has already been started.")
+ }
- def apply(system: ActorSystem): Kamon =
- new Kamon(system)
+ def start(): Unit =
+ start(ConfigFactory.load)
- def create(): Kamon =
- apply()
+ def metrics: MetricsExtension =
+ ifStarted(_.metrics)
- def create(actorSystemName: String): Kamon =
- apply(ActorSystem(actorSystemName))
+ def tracer: TracerExtension =
+ ifStarted(_.tracer)
- def create(actorSystemName: String, config: Config): Kamon =
- apply(ActorSystem(actorSystemName, config))
+ def userMetrics: UserMetricsExtension =
+ ifStarted(_.userMetrics)
- def create(system: ActorSystem): Kamon =
- new Kamon(system)
+ def apply[T <: Kamon.Extension](key: ExtensionId[T]): T =
+ ifStarted { _ ⇒
+ if (_system ne null)
+ key(_system)
+ else
+ sys.error("Cannot retrieve extensions while Kamon is being initialized.")
+ }
+
+ def extension[T <: Kamon.Extension](key: ExtensionId[T]): T =
+ apply(key)
+
+ private def ifStarted[T](thunk: KamonCoreComponents ⇒ T): T =
+ _coreComponents.map(thunk(_)) getOrElse (sys.error("Kamon has not been started yet."))
+
+}
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index 88352e21..87911352 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -16,25 +16,17 @@
package kamon.metric
-import akka.actor
+import com.typesafe.config.Config
import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe }
-import kamon.Kamon
-import kamon.metric.instrument.{ InstrumentFactory, CollectionContext }
-import kamon.supervisor.ModuleSupervisor
+import kamon.metric.instrument.{ DefaultRefreshScheduler, InstrumentFactory, CollectionContext }
import scala.collection.concurrent.TrieMap
import akka.actor._
-import kamon.util.{ FastDispatch, TriemapAtomicGetOrElseUpdate }
-
-object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
- override def get(system: ActorSystem): MetricsExtension = super.get(system)
- def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
- def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtensionImpl(system)
-}
+import kamon.util.{ LazyActorRef, TriemapAtomicGetOrElseUpdate }
case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T)
-trait MetricsExtension extends Kamon.Extension {
+trait MetricsExtension {
def settings: MetricsExtensionSettings
def shouldTrack(entity: Entity): Boolean
def shouldTrack(entityName: String, category: String): Boolean =
@@ -63,16 +55,11 @@ trait MetricsExtension extends Kamon.Extension {
def instrumentFactory(category: String): InstrumentFactory
}
-class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension {
- import FastDispatch.Syntax
-
- val settings = MetricsExtensionSettings(system)
-
+private[kamon] class MetricsExtensionImpl(config: Config) extends MetricsExtension {
private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder]
- private val _collectionContext = buildDefaultCollectionContext
- private val _metricsCollectionDispatcher = system.dispatchers.lookup(settings.metricCollectionDispatcher)
- private lazy val _subscriptions = ModuleSupervisor.get(system).createModule("subscriptions-dispatcher",
- SubscriptionsDispatcher.props(settings.tickInterval, collectSnapshots).withDispatcher(settings.metricCollectionDispatcher))
+ private val _subscriptions = new LazyActorRef
+
+ val settings = MetricsExtensionSettings(config)
def shouldTrack(entity: Entity): Boolean =
settings.entityFilters.get(entity.category).map {
@@ -110,10 +97,10 @@ class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension
find(Entity(name, category))
def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit =
- _subscriptions.fastDispatch(Subscribe(filter, subscriber, permanent))(_metricsCollectionDispatcher)
+ _subscriptions.tell(Subscribe(filter, subscriber, permanent))
def unsubscribe(subscriber: ActorRef): Unit =
- _subscriptions.fastDispatch(Unsubscribe(subscriber))(_metricsCollectionDispatcher)
+ _subscriptions.tell(Unsubscribe(subscriber))
def buildDefaultCollectionContext: CollectionContext =
CollectionContext(settings.defaultCollectionContextBufferSize)
@@ -121,16 +108,34 @@ class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension
def instrumentFactory(category: String): InstrumentFactory =
settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory)
- /**
- * Collect and dispatch.
- */
- private def collectSnapshots(): Map[Entity, EntitySnapshot] = {
+ private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = {
val builder = Map.newBuilder[Entity, EntitySnapshot]
_trackedEntities.foreach {
- case (identity, recorder) ⇒ builder += ((identity, recorder.collect(_collectionContext)))
+ case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext)))
}
builder.result()
}
+
+ /**
+ * Metrics Extension initialization.
+ */
+ private var _system: ActorSystem = null
+ private lazy val _start = {
+ _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics"))
+ settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher))
+ }
+
+ def start(system: ActorSystem): Unit = synchronized {
+ _system = system
+ _start
+ _system = null
+ }
+}
+
+private[kamon] object MetricsExtensionImpl {
+
+ def apply(config: Config) =
+ new MetricsExtensionImpl(config)
}
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
index 84624336..9881ed00 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala
@@ -16,9 +16,8 @@
package kamon.metric
-import akka.actor.ExtendedActorSystem
import com.typesafe.config.Config
-import kamon.metric.instrument.{ RefreshScheduler, InstrumentFactory, DefaultInstrumentSettings, InstrumentCustomSettings }
+import kamon.metric.instrument._
import kamon.util.GlobPathFilter
import scala.concurrent.duration.FiniteDuration
@@ -27,15 +26,19 @@ import scala.concurrent.duration.FiniteDuration
* Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key.
*/
case class MetricsExtensionSettings(
- tickInterval: FiniteDuration,
- defaultCollectionContextBufferSize: Int,
- trackUnmatchedEntities: Boolean,
- entityFilters: Map[String, EntityFilter],
- instrumentFactories: Map[String, InstrumentFactory],
- defaultInstrumentFactory: InstrumentFactory,
- metricCollectionDispatcher: String,
- refreshSchedulerDispatcher: String,
- refreshScheduler: RefreshScheduler)
+ tickInterval: FiniteDuration,
+ defaultCollectionContextBufferSize: Int,
+ trackUnmatchedEntities: Boolean,
+ entityFilters: Map[String, EntityFilter],
+ instrumentFactories: Map[String, InstrumentFactory],
+ defaultInstrumentFactory: InstrumentFactory,
+ refreshScheduler: RefreshScheduler) {
+
+ private[kamon] def pointScheduler(targetScheduler: RefreshScheduler): Unit = refreshScheduler match {
+ case lrs: LazyRefreshScheduler ⇒ lrs.point(targetScheduler)
+ case others ⇒
+ }
+}
/**
*
@@ -49,23 +52,21 @@ object MetricsExtensionSettings {
import kamon.util.ConfigTools.Syntax
import scala.concurrent.duration._
- def apply(system: ExtendedActorSystem): MetricsExtensionSettings = {
- val metricConfig = system.settings.config.getConfig("kamon.metric")
+ def apply(config: Config): MetricsExtensionSettings = {
+ val metricConfig = config.getConfig("kamon.metric")
val tickInterval = metricConfig.getFiniteDuration("tick-interval")
val collectBufferSize = metricConfig.getInt("default-collection-context-buffer-size")
val trackUnmatchedEntities = metricConfig.getBoolean("track-unmatched-entities")
val entityFilters = loadFilters(metricConfig.getConfig("filters"))
val defaultInstrumentSettings = DefaultInstrumentSettings.fromConfig(metricConfig.getConfig("default-instrument-settings"))
- val metricCollectionDispatcher = metricConfig.getString("dispatchers.metric-collection")
- val refreshSchedulerDispatcher = metricConfig.getString("dispatchers.refresh-scheduler")
- val refreshScheduler = RefreshScheduler(system.scheduler, system.dispatchers.lookup(refreshSchedulerDispatcher))
+ val refreshScheduler = new LazyRefreshScheduler
val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler)
val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler)
MetricsExtensionSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories,
- defaultInstrumentFactory, metricCollectionDispatcher, refreshSchedulerDispatcher, refreshScheduler)
+ defaultInstrumentFactory, refreshScheduler)
}
/**
diff --git a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
index f616be35..68b545a5 100644
--- a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
+++ b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala
@@ -24,11 +24,12 @@ import scala.concurrent.duration.FiniteDuration
/**
* Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers.
*/
-private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]) extends Actor {
+private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl) extends Actor {
var lastTick = MilliTimestamp.now
var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher)
+ val collectionContext = metricsExtension.buildDefaultCollectionContext
def receive = {
case Tick ⇒ processTick()
@@ -38,7 +39,7 @@ private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector
}
def processTick(): Unit =
- dispatch(collector())
+ dispatch(metricsExtension.collectSnapshots(collectionContext))
def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = {
def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] =
@@ -80,8 +81,8 @@ private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector
}
object SubscriptionsDispatcher {
- def props(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]): Props =
- Props(new SubscriptionsDispatcher(interval, collector))
+ def props(interval: FiniteDuration, metricsExtension: MetricsExtensionImpl): Props =
+ Props(new SubscriptionsDispatcher(interval, metricsExtension))
case object Tick
case class Unsubscribe(subscriber: ActorRef)
diff --git a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
index 358dade8..dfc5d5f0 100644
--- a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
+++ b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
@@ -29,7 +29,7 @@ class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef
import MapMerge.Syntax
val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
- val collectionContext: CollectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext
+ val collectionContext: CollectionContext = Kamon.metrics.buildDefaultCollectionContext
def receive = empty
diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
index e6ec5e99..e0818292 100644
--- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
@@ -16,30 +16,13 @@
package kamon.metric
-import akka.actor
-import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
-import kamon.Kamon
import kamon.metric.instrument.Gauge.CurrentValueCollector
import kamon.metric.instrument.Histogram.DynamicRange
import kamon.metric.instrument._
import scala.concurrent.duration.FiniteDuration
-object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider {
- override def get(system: ActorSystem): UserMetricsExtension = super.get(system)
- def lookup(): ExtensionId[_ <: actor.Extension] = UserMetrics
- def createExtension(system: ExtendedActorSystem): UserMetricsExtension = {
- val metricsExtension = Metrics.get(system)
- val instrumentFactory = metricsExtension.instrumentFactory(entity.category)
- val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory)
-
- metricsExtension.register(entity, userMetricsExtension).recorder
- }
-
- val entity = Entity("user-metric", "user-metric")
-}
-
-trait UserMetricsExtension extends Kamon.Extension {
+trait UserMetricsExtension {
def histogram(name: String): Histogram
def histogram(name: String, dynamicRange: DynamicRange): Histogram
def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram
@@ -86,7 +69,7 @@ trait UserMetricsExtension extends Kamon.Extension {
}
-class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension {
+private[kamon] class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension {
override def histogram(name: String): Histogram =
super.histogram(name)
@@ -206,4 +189,16 @@ class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends Gen
override def removeCounter(key: CounterKey): Unit =
super.removeCounter(key)
+}
+
+private[kamon] object UserMetricsExtensionImpl {
+ val UserMetricEntity = Entity("user-metric", "user-metric")
+
+ def apply(metricsExtension: MetricsExtension): UserMetricsExtensionImpl = {
+ val instrumentFactory = metricsExtension.instrumentFactory(UserMetricEntity.category)
+ val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory)
+
+ metricsExtension.register(UserMetricEntity, userMetricsExtension).recorder
+ }
+
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
index 68d5c876..59b4b443 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
@@ -51,22 +51,3 @@ object CollectionContext {
}
}
-trait RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
-}
-
-object RefreshScheduler {
- val NoopScheduler = new RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = new Cancellable {
- override def isCancelled: Boolean = true
- override def cancel(): Boolean = true
- }
- }
-
- def apply(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = new RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
- scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
- }
-
- def create(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = apply(scheduler, dispatcher)
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
new file mode 100644
index 00000000..adb08713
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
@@ -0,0 +1,99 @@
+package kamon.metric.instrument
+
+import akka.actor.{ Scheduler, Cancellable }
+import org.HdrHistogram.WriterReaderPhaser
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+
+trait RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
+}
+
+/**
+ * Default implementation of RefreshScheduler that simply uses an [[akka.actor.Scheduler]] to schedule tasks to be run
+ * in the provided ExecutionContext.
+ */
+class DefaultRefreshScheduler(scheduler: Scheduler, dispatcher: ExecutionContext) extends RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
+ scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
+}
+
+object DefaultRefreshScheduler {
+ def apply(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ new DefaultRefreshScheduler(scheduler, dispatcher)
+
+ def create(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ apply(scheduler, dispatcher)
+}
+
+/**
+ * RefreshScheduler implementation that accumulates all the scheduled actions until it is pointed to another refresh
+ * scheduler. Once it is pointed, all subsequent calls to `schedule` will immediately be scheduled in the pointed
+ * scheduler.
+ */
+class LazyRefreshScheduler extends RefreshScheduler {
+ private val _schedulerPhaser = new WriterReaderPhaser
+ private val _backlog = new TrieMap[(FiniteDuration, () ⇒ Unit), RepointableCancellable]()
+ @volatile private var _target: Option[RefreshScheduler] = None
+
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = {
+ val criticalEnter = _schedulerPhaser.writerCriticalSectionEnter()
+ try {
+ _target.map { scheduler ⇒
+ scheduler.schedule(interval, refresh)
+
+ } getOrElse {
+ val entry = (interval, refresh)
+ val cancellable = new RepointableCancellable(entry)
+
+ _backlog.put(entry, cancellable)
+ cancellable
+ }
+
+ } finally {
+ _schedulerPhaser.writerCriticalSectionExit(criticalEnter)
+ }
+ }
+
+ def point(target: RefreshScheduler): Unit = try {
+ _schedulerPhaser.readerLock()
+
+ if (_target.isEmpty) {
+ _target = Some(target)
+ _schedulerPhaser.flipPhase(10000L)
+ _backlog.dropWhile {
+ case ((interval, refresh), repointableCancellable) ⇒
+ repointableCancellable.point(target.schedule(interval, refresh))
+ true
+ }
+ } else sys.error("A LazyRefreshScheduler cannot be pointed more than once.")
+ } finally { _schedulerPhaser.readerUnlock() }
+
+ class RepointableCancellable(entry: (FiniteDuration, () ⇒ Unit)) extends Cancellable {
+ private var _isCancelled = false
+ private var _cancellable: Option[Cancellable] = None
+
+ def isCancelled: Boolean = synchronized {
+ _cancellable.map(_.isCancelled).getOrElse(_isCancelled)
+ }
+
+ def cancel(): Boolean = synchronized {
+ _isCancelled = true
+ _cancellable.map(_.cancel()).getOrElse(_backlog.remove(entry).nonEmpty)
+ }
+
+ def point(cancellable: Cancellable): Unit = synchronized {
+ if (_cancellable.isEmpty) {
+ _cancellable = Some(cancellable)
+
+ if (_isCancelled)
+ cancellable.cancel()
+
+ } else sys.error("A RepointableCancellable cannot be pointed more than once.")
+
+ }
+ }
+}
+
diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
index e62178dd..5f7fdff5 100644
--- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
@@ -18,16 +18,14 @@ package kamon.trace
import java.util.concurrent.ConcurrentLinkedQueue
-import akka.actor.{ ExtensionId, ActorSystem }
import akka.event.LoggingAdapter
-import kamon.Kamon.Extension
import kamon.metric.{ MetricsExtension, TraceMetrics }
import kamon.util.{ NanoInterval, RelativeNanoTimestamp }
import scala.annotation.tailrec
private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
- val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val actorSystem: ActorSystem)
+ val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension)
extends TraceContext {
@volatile private var _name = traceName
@@ -48,8 +46,6 @@ private[kamon] class MetricsOnlyContext(traceName: String, val token: String, iz
def isOpen: Boolean = _isOpen
def addMetadata(key: String, value: String): Unit = {}
- def lookupExtension[T <: Extension](id: ExtensionId[T]): T = id(actorSystem)
-
def finish(): Unit = {
_isOpen = false
val traceElapsedTime = NanoInterval.since(startTimestamp)
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index ed8170a9..48e56153 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -17,12 +17,8 @@
package kamon.trace
import java.io.ObjectStreamException
-import akka.actor.{ ExtensionId, ActorSystem }
-import kamon.Kamon.Extension
-import kamon._
-import kamon.metric._
import kamon.trace.TraceContextAware.DefaultTraceContextAware
-import kamon.util.{ NanoInterval, RelativeNanoTimestamp }
+import kamon.util.RelativeNanoTimestamp
trait TraceContext {
def name: String
@@ -39,8 +35,6 @@ trait TraceContext {
def addMetadata(key: String, value: String)
def startTimestamp: RelativeNanoTimestamp
-
- def lookupExtension[T <: Kamon.Extension](id: ExtensionId[T]): T
}
object TraceContext {
@@ -99,9 +93,6 @@ case object EmptyTraceContext extends TraceContext {
def addMetadata(key: String, value: String): Unit = {}
def startTimestamp = new RelativeNanoTimestamp(0L)
- override def lookupExtension[T <: Extension](id: ExtensionId[T]): T =
- sys.error("Can't lookup extensions on a EmptyTraceContext.")
-
case object EmptySegment extends Segment {
val name: String = "empty-segment"
val category: String = "empty-category"
diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala
index 41dcd6bc..be565154 100644
--- a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala
@@ -20,20 +20,13 @@ import java.net.InetAddress
import java.util.concurrent.atomic.AtomicLong
import akka.actor._
-import akka.actor
-import kamon.Kamon
-import kamon.metric.{ Metrics, MetricsExtension }
-import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp, GlobPathFilter }
+import com.typesafe.config.Config
+import kamon.metric.MetricsExtension
+import kamon.util._
import scala.util.Try
-object Tracer extends ExtensionId[TracerExtension] with ExtensionIdProvider {
- override def get(system: ActorSystem): TracerExtension = super.get(system)
- def lookup(): ExtensionId[_ <: actor.Extension] = Tracer
- def createExtension(system: ExtendedActorSystem): TracerExtension = new TracerExtensionImpl(system)
-}
-
-trait TracerExtension extends Kamon.Extension {
+trait TracerExtension {
def newContext(name: String): TraceContext
def newContext(name: String, token: String): TraceContext
def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext
@@ -42,14 +35,13 @@ trait TracerExtension extends Kamon.Extension {
def unsubscribe(subscriber: ActorRef): Unit
}
-class TracerExtensionImpl(system: ExtendedActorSystem) extends TracerExtension {
- private val _settings = TraceSettings(system)
- private val _metricsExtension = Metrics.get(system)
-
+private[kamon] class TracerExtensionImpl(metricsExtension: MetricsExtension, config: Config) extends TracerExtension {
+ private val _settings = TraceSettings(config)
private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
private val _tokenCounter = new AtomicLong
- private val _subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions")
- private val _incubator = system.actorOf(Incubator.props(_subscriptions))
+
+ private val _subscriptions = new LazyActorRef
+ private val _incubator = new LazyActorRef
private def newToken: String =
_hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet())
@@ -66,7 +58,7 @@ class TracerExtensionImpl(system: ExtendedActorSystem) extends TracerExtension {
private def createTraceContext(traceName: String, token: String = newToken, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now,
isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = {
- def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null, _metricsExtension, system)
+ def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null, metricsExtension)
if (_settings.levelOfDetail == LevelOfDetail.MetricsOnly || !isLocal)
newMetricsOnlyContext
@@ -74,20 +66,44 @@ class TracerExtensionImpl(system: ExtendedActorSystem) extends TracerExtension {
if (!_settings.sampler.shouldTrace)
newMetricsOnlyContext
else
- new TracingContext(traceName, token, true, _settings.levelOfDetail, isLocal, startTimestamp, null, _metricsExtension, this, system, dispatchTracingContext)
+ new TracingContext(traceName, token, true, _settings.levelOfDetail, isLocal, startTimestamp, null, metricsExtension, this, dispatchTracingContext)
}
}
- def subscribe(subscriber: ActorRef): Unit = _subscriptions ! TraceSubscriptions.Subscribe(subscriber)
- def unsubscribe(subscriber: ActorRef): Unit = _subscriptions ! TraceSubscriptions.Unsubscribe(subscriber)
+ def subscribe(subscriber: ActorRef): Unit =
+ _subscriptions.tell(TraceSubscriptions.Subscribe(subscriber))
+
+ def unsubscribe(subscriber: ActorRef): Unit =
+ _subscriptions.tell(TraceSubscriptions.Unsubscribe(subscriber))
private[kamon] def dispatchTracingContext(trace: TracingContext): Unit =
if (_settings.sampler.shouldReport(trace.elapsedTime))
if (trace.shouldIncubate)
- _incubator ! trace
+ _incubator.tell(trace)
else
- _subscriptions ! trace.generateTraceInfo
+ _subscriptions.tell(trace.generateTraceInfo)
+
+ /**
+ * Tracer Extension initialization.
+ */
+ private var _system: ActorSystem = null
+ private lazy val _start = {
+ val subscriptions = _system.actorOf(Props[TraceSubscriptions], "trace-subscriptions")
+ _subscriptions.point(subscriptions)
+ _incubator.point(_system.actorOf(Incubator.props(subscriptions)))
+ }
+
+ def start(system: ActorSystem): Unit = synchronized {
+ _system = system
+ _start
+ _system = null
+ }
+}
+
+private[kamon] object TracerExtensionImpl {
+ def apply(metricsExtension: MetricsExtension, config: Config) =
+ new TracerExtensionImpl(metricsExtension, config)
}
case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo])
diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala
index d360330d..7510ab7f 100644
--- a/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala
@@ -18,13 +18,13 @@ package kamon.trace
import java.util.concurrent.TimeUnit
-import akka.actor.ActorSystem
+import com.typesafe.config.Config
case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler)
object TraceSettings {
- def apply(system: ActorSystem): TraceSettings = {
- val tracerConfig = system.settings.config.getConfig("kamon.trace")
+ def apply(config: Config): TraceSettings = {
+ val tracerConfig = config.getConfig("kamon.trace")
val detailLevel: LevelOfDetail = tracerConfig.getString("level-of-detail") match {
case "metrics-only" ⇒ LevelOfDetail.MetricsOnly
diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
index dd4c3c1a..3d324886 100644
--- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
@@ -19,7 +19,6 @@ package kamon.trace
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp }
import kamon.metric.MetricsExtension
@@ -28,8 +27,8 @@ import scala.collection.concurrent.TrieMap
private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail,
isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension,
- traceExtension: TracerExtensionImpl, system: ActorSystem, traceInfoSink: TracingContext ⇒ Unit)
- extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log, metricsExtension, system) {
+ traceExtension: TracerExtensionImpl, traceInfoSink: TracingContext ⇒ Unit)
+ extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log, metricsExtension) {
private val _openSegments = new AtomicInteger(0)
private val _startTimestamp = NanoTimestamp.now
diff --git a/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala
new file mode 100644
index 00000000..855bf1fc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala
@@ -0,0 +1,53 @@
+package kamon.util
+
+import java.util
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import akka.actor.{ Actor, ActorRef }
+import org.HdrHistogram.WriterReaderPhaser
+
+import scala.annotation.tailrec
+
+/**
+ * A LazyActorRef accumulates messages sent to an actor that doesn't exist yet. Once the actor is created and
+ * the LazyActorRef is pointed to it, all the accumulated messages are flushed and any new message sent to the
+ * LazyActorRef will immediately be sent to the pointed ActorRef.
+ *
+ * This is intended to be used during Kamon's initialization where some components need to use ActorRefs to work
+ * (like subscriptions and the trace incubator) but our internal ActorSystem is not yet ready to create the
+ * required actors.
+ */
+class LazyActorRef {
+ private val _refPhaser = new WriterReaderPhaser
+ private val _backlog = new ConcurrentLinkedQueue[(Any, ActorRef)]()
+ @volatile private var _target: Option[ActorRef] = None
+
+ def tell(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
+ val criticalEnter = _refPhaser.writerCriticalSectionEnter()
+ try {
+ _target.map(_.tell(message, sender)) getOrElse {
+ _backlog.add((message, sender))
+ }
+
+ } finally { _refPhaser.writerCriticalSectionExit(criticalEnter) }
+ }
+
+ def point(target: ActorRef): Unit = {
+ @tailrec def drain(q: util.Queue[(Any, ActorRef)]): Unit = if (!q.isEmpty) {
+ val (msg, sender) = q.poll()
+ target.tell(msg, sender)
+ drain(q)
+ }
+
+ try {
+ _refPhaser.readerLock()
+
+ if (_target.isEmpty) {
+ _target = Some(target)
+ _refPhaser.flipPhase(1000L)
+ drain(_backlog)
+
+ } else sys.error("A LazyActorRef cannot be pointed more than once.")
+ } finally { _refPhaser.readerUnlock() }
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
index 7dbcafd7..53ae5273 100644
--- a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
@@ -19,6 +19,7 @@ package kamon.metric
import akka.actor._
import akka.testkit.{ TestProbe, ImplicitSender }
import com.typesafe.config.ConfigFactory
+import kamon.Kamon
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
@@ -32,7 +33,7 @@ class SubscriptionsProtocolSpec extends BaseKamonSpec("subscriptions-protocol-sp
|}
""".stripMargin)
- val metricsModule = kamon.metrics
+ lazy val metricsModule = Kamon.metrics
import metricsModule.{ register, subscribe, unsubscribe }
"the Subscriptions messaging protocol" should {
diff --git a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
index 2e1f246d..0c9ced32 100644
--- a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
@@ -17,6 +17,7 @@
package kamon.metric
import com.typesafe.config.ConfigFactory
+import kamon.Kamon
import kamon.metric.instrument.Histogram.MutableRecord
import kamon.testkit.BaseKamonSpec
import kamon.util.MilliTimestamp
@@ -85,9 +86,9 @@ class TickMetricSnapshotBufferSpec extends BaseKamonSpec("trace-metrics-spec") w
}
trait SnapshotFixtures {
- val collectionContext = kamon.metrics.buildDefaultCollectionContext
+ val collectionContext = Kamon.metrics.buildDefaultCollectionContext
val testTraceIdentity = Entity("buffer-spec-test-trace", "trace")
- val traceRecorder = kamon.metrics.register(TraceMetrics, "buffer-spec-test-trace").get.recorder
+ val traceRecorder = Kamon.metrics.register(TraceMetrics, "buffer-spec-test-trace").get.recorder
val firstEmpty = TickMetricSnapshot(new MilliTimestamp(1000), new MilliTimestamp(2000), Map.empty)
val secondEmpty = TickMetricSnapshot(new MilliTimestamp(2000), new MilliTimestamp(3000), Map.empty)
diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
index 7b6cbebc..455518f8 100644
--- a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
@@ -17,6 +17,7 @@
package kamon.metric
import com.typesafe.config.ConfigFactory
+import kamon.Kamon
import kamon.metric.instrument.Histogram.DynamicRange
import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
@@ -34,54 +35,54 @@ class UserMetricsSpec extends BaseKamonSpec("user-metrics-spec") {
"the UserMetrics extension" should {
"allow registering a fully configured Histogram and get the same Histogram if registering again" in {
- val histogramA = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
- val histogramB = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
+ val histogramA = Kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
+ val histogramB = Kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
histogramA shouldBe theSameInstanceAs(histogramB)
}
"return the original Histogram when registering a fully configured Histogram for second time but with different settings" in {
- val histogramA = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
- val histogramB = kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 50000, 2))
+ val histogramA = Kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 10000, 2))
+ val histogramB = Kamon.userMetrics.histogram("histogram-with-settings", DynamicRange(1, 50000, 2))
histogramA shouldBe theSameInstanceAs(histogramB)
}
"allow registering a Histogram that takes the default configuration from the kamon.metrics.precision settings" in {
- kamon.userMetrics.histogram("histogram-with-default-configuration")
+ Kamon.userMetrics.histogram("histogram-with-default-configuration")
}
"allow registering a Counter and get the same Counter if registering again" in {
- val counterA = kamon.userMetrics.counter("counter")
- val counterB = kamon.userMetrics.counter("counter")
+ val counterA = Kamon.userMetrics.counter("counter")
+ val counterB = Kamon.userMetrics.counter("counter")
counterA shouldBe theSameInstanceAs(counterB)
}
"allow registering a fully configured MinMaxCounter and get the same MinMaxCounter if registering again" in {
- val minMaxCounterA = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
- val minMaxCounterB = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
+ val minMaxCounterA = Kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
+ val minMaxCounterB = Kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB)
}
"return the original MinMaxCounter when registering a fully configured MinMaxCounter for second time but with different settings" in {
- val minMaxCounterA = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
- val minMaxCounterB = kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 50000, 2), 1 second)
+ val minMaxCounterA = Kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 10000, 2), 1 second)
+ val minMaxCounterB = Kamon.userMetrics.minMaxCounter("min-max-counter-with-settings", DynamicRange(1, 50000, 2), 1 second)
minMaxCounterA shouldBe theSameInstanceAs(minMaxCounterB)
}
"allow registering a MinMaxCounter that takes the default configuration from the kamon.metrics.precision settings" in {
- kamon.userMetrics.minMaxCounter("min-max-counter-with-default-configuration")
+ Kamon.userMetrics.minMaxCounter("min-max-counter-with-default-configuration")
}
"allow registering a fully configured Gauge and get the same Gauge if registering again" in {
- val gaugeA = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
+ val gaugeA = Kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
() ⇒ 1L
})
- val gaugeB = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
+ val gaugeB = Kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
() ⇒ 1L
})
@@ -89,11 +90,11 @@ class UserMetricsSpec extends BaseKamonSpec("user-metrics-spec") {
}
"return the original Gauge when registering a fully configured Gauge for second time but with different settings" in {
- val gaugeA = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
+ val gaugeA = Kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
() ⇒ 1L
})
- val gaugeB = kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
+ val gaugeB = Kamon.userMetrics.gauge("gauge-with-settings", DynamicRange(1, 10000, 2), 1 second, {
() ⇒ 1L
})
@@ -101,26 +102,26 @@ class UserMetricsSpec extends BaseKamonSpec("user-metrics-spec") {
}
"allow registering a Gauge that takes the default configuration from the kamon.metrics.precision settings" in {
- kamon.userMetrics.gauge("gauge-with-default-configuration", {
+ Kamon.userMetrics.gauge("gauge-with-default-configuration", {
() ⇒ 2L
})
}
"allow un-registering user metrics" in {
- val counter = kamon.userMetrics.counter("counter-for-remove")
- val histogram = kamon.userMetrics.histogram("histogram-for-remove")
- val minMaxCounter = kamon.userMetrics.minMaxCounter("min-max-counter-for-remove")
- val gauge = kamon.userMetrics.gauge("gauge-for-remove", { () ⇒ 2L })
-
- kamon.userMetrics.removeCounter("counter-for-remove")
- kamon.userMetrics.removeHistogram("histogram-for-remove")
- kamon.userMetrics.removeMinMaxCounter("min-max-counter-for-remove")
- kamon.userMetrics.removeGauge("gauge-for-remove")
-
- counter should not be (theSameInstanceAs(kamon.userMetrics.counter("counter-for-remove")))
- histogram should not be (theSameInstanceAs(kamon.userMetrics.histogram("histogram-for-remove")))
- minMaxCounter should not be (theSameInstanceAs(kamon.userMetrics.minMaxCounter("min-max-counter-for-remove")))
- gauge should not be (theSameInstanceAs(kamon.userMetrics.gauge("gauge-for-remove", { () ⇒ 2L })))
+ val counter = Kamon.userMetrics.counter("counter-for-remove")
+ val histogram = Kamon.userMetrics.histogram("histogram-for-remove")
+ val minMaxCounter = Kamon.userMetrics.minMaxCounter("min-max-counter-for-remove")
+ val gauge = Kamon.userMetrics.gauge("gauge-for-remove", { () ⇒ 2L })
+
+ Kamon.userMetrics.removeCounter("counter-for-remove")
+ Kamon.userMetrics.removeHistogram("histogram-for-remove")
+ Kamon.userMetrics.removeMinMaxCounter("min-max-counter-for-remove")
+ Kamon.userMetrics.removeGauge("gauge-for-remove")
+
+ counter should not be (theSameInstanceAs(Kamon.userMetrics.counter("counter-for-remove")))
+ histogram should not be (theSameInstanceAs(Kamon.userMetrics.histogram("histogram-for-remove")))
+ minMaxCounter should not be (theSameInstanceAs(Kamon.userMetrics.minMaxCounter("min-max-counter-for-remove")))
+ gauge should not be (theSameInstanceAs(Kamon.userMetrics.gauge("gauge-for-remove", { () ⇒ 2L })))
}
}
}
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
index bb494d0b..ec07d66c 100644
--- a/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/GaugeSpec.scala
@@ -17,6 +17,7 @@
package kamon.metric.instrument
import java.util.concurrent.atomic.AtomicLong
+import kamon.Kamon
import kamon.metric.instrument.Histogram.DynamicRange
import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
@@ -48,7 +49,7 @@ class GaugeSpec extends BaseKamonSpec("gauge-spec") {
Thread.sleep(1.second.toMillis)
gauge.cleanup
- val snapshot = gauge.collect(kamon.metrics.buildDefaultCollectionContext)
+ val snapshot = gauge.collect(Kamon.metrics.buildDefaultCollectionContext)
snapshot.numberOfMeasurements should be(10L +- 1L)
snapshot.min should be(1)
@@ -58,7 +59,7 @@ class GaugeSpec extends BaseKamonSpec("gauge-spec") {
"not record the current value when doing a collection" in new GaugeFixture {
val (numberOfValuesRecorded, gauge) = createGauge(10 seconds)
- val snapshot = gauge.collect(kamon.metrics.buildDefaultCollectionContext)
+ val snapshot = gauge.collect(Kamon.metrics.buildDefaultCollectionContext)
snapshot.numberOfMeasurements should be(0)
numberOfValuesRecorded.get() should be(0)
}
@@ -67,7 +68,7 @@ class GaugeSpec extends BaseKamonSpec("gauge-spec") {
trait GaugeFixture {
def createGauge(refreshInterval: FiniteDuration = 100 millis): (AtomicLong, Gauge) = {
val recordedValuesCounter = new AtomicLong(0)
- val gauge = Gauge(DynamicRange(1, 100, 2), refreshInterval, kamon.metrics.settings.refreshScheduler, {
+ val gauge = Gauge(DynamicRange(1, 100, 2), refreshInterval, Kamon.metrics.settings.refreshScheduler, {
() ⇒ recordedValuesCounter.addAndGet(1)
})
diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala
index 7a3d7aa3..7acfc229 100644
--- a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala
@@ -19,6 +19,7 @@ import java.nio.LongBuffer
import akka.actor._
import akka.testkit.TestProbe
+import kamon.Kamon
import kamon.metric.instrument.Histogram.{ DynamicRange, MutableRecord }
import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
@@ -109,7 +110,7 @@ class MinMaxCounterSpec extends BaseKamonSpec("min-max-counter-spec") {
val buffer: LongBuffer = LongBuffer.allocate(64)
}
- val mmCounter = MinMaxCounter(DynamicRange(1, 1000, 2), 1 hour, kamon.metrics.settings.refreshScheduler)
+ val mmCounter = MinMaxCounter(DynamicRange(1, 1000, 2), 1 hour, Kamon.metrics.settings.refreshScheduler)
mmCounter.cleanup // cancel the refresh schedule
def collectCounterSnapshot(): Histogram.Snapshot = mmCounter.collect(collectionContext)
diff --git a/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala
index 9142ff16..eab6b754 100644
--- a/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala
+++ b/kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala
@@ -22,29 +22,39 @@ import com.typesafe.config.{ Config, ConfigFactory }
import kamon.Kamon
import kamon.metric.{ SubscriptionsDispatcher, EntitySnapshot, MetricsExtensionImpl }
import kamon.trace.TraceContext
+import kamon.util.LazyActorRef
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
+import scala.reflect.ClassTag
+
abstract class BaseKamonSpec(actorSystemName: String) extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll {
- lazy val kamon = Kamon(actorSystemName, config)
- lazy val collectionContext = kamon.metrics.buildDefaultCollectionContext
- implicit lazy val system: ActorSystem = kamon.actorSystem
+ lazy val collectionContext = Kamon.metrics.buildDefaultCollectionContext
+ implicit lazy val system: ActorSystem = {
+ Kamon.start(config.withFallback(ConfigFactory.load()))
+ ActorSystem(actorSystemName, config)
+ }
def config: Config =
- ConfigFactory.load()
+ ConfigFactory.empty()
def newContext(name: String): TraceContext =
- kamon.tracer.newContext(name)
+ Kamon.tracer.newContext(name)
def newContext(name: String, token: String): TraceContext =
- kamon.tracer.newContext(name, token)
+ Kamon.tracer.newContext(name, token)
def takeSnapshotOf(name: String, category: String): EntitySnapshot = {
- val recorder = kamon.metrics.find(name, category).get
+ val recorder = Kamon.metrics.find(name, category).get
recorder.collect(collectionContext)
}
- def flushSubscriptions(): Unit =
- system.actorSelection("/user/kamon/subscriptions-dispatcher") ! SubscriptionsDispatcher.Tick
+ def flushSubscriptions(): Unit = {
+ val subscriptionsField = Kamon.metrics.getClass.getDeclaredField("_subscriptions")
+ subscriptionsField.setAccessible(true)
+ val subscriptions = subscriptionsField.get(Kamon.metrics).asInstanceOf[LazyActorRef]
+
+ subscriptions.tell(SubscriptionsDispatcher.Tick)
+ }
override protected def afterAll(): Unit = system.shutdown()
}
diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
index 0cb4ce34..1d270106 100644
--- a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
@@ -40,7 +40,7 @@ class SimpleTraceSpec extends BaseKamonSpec("simple-trace-spec") {
"the simple tracing" should {
"send a TraceInfo when the trace has finished and all segments are finished" in {
- Kamon(Tracer)(system).subscribe(testActor)
+ Kamon.tracer.subscribe(testActor)
TraceContext.withContext(newContext("simple-trace-without-segments")) {
TraceContext.currentContext.startSegment("segment-one", "test-segment", "test").finish()
@@ -49,7 +49,7 @@ class SimpleTraceSpec extends BaseKamonSpec("simple-trace-spec") {
}
val traceInfo = expectMsgType[TraceInfo]
- Kamon(Tracer)(system).unsubscribe(testActor)
+ Kamon.tracer.unsubscribe(testActor)
traceInfo.name should be("simple-trace-without-segments")
traceInfo.segments.size should be(2)
@@ -58,7 +58,7 @@ class SimpleTraceSpec extends BaseKamonSpec("simple-trace-spec") {
}
"incubate the tracing context if there are open segments after finishing" in {
- Kamon(Tracer)(system).subscribe(testActor)
+ Kamon.tracer.subscribe(testActor)
val secondSegment = TraceContext.withContext(newContext("simple-trace-without-segments")) {
TraceContext.currentContext.startSegment("segment-one", "test-segment", "test").finish()
@@ -72,7 +72,7 @@ class SimpleTraceSpec extends BaseKamonSpec("simple-trace-spec") {
within(10 seconds) {
val traceInfo = expectMsgType[TraceInfo]
- Kamon(Tracer)(system).unsubscribe(testActor)
+ Kamon.tracer.unsubscribe(testActor)
traceInfo.name should be("simple-trace-without-segments")
traceInfo.segments.size should be(2)