aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-05-18 21:38:55 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-05-18 21:38:55 -0300
commitb18d05149881e9eb141f9f4878916f37c5b52247 (patch)
tree65928ffb8648330bcc351c175cab3711dfe2ad2e /kamon-core/src/main/scala
parent9edd4676b3f5979d9ecd5a3942480306ffd27616 (diff)
downloadKamon-b18d05149881e9eb141f9f4878916f37c5b52247.tar.gz
Kamon-b18d05149881e9eb141f9f4878916f37c5b52247.tar.bz2
Kamon-b18d05149881e9eb141f9f4878916f37c5b52247.zip
= core: move the scheduling of gauge recordings to MetricsExtension and load interval for recordings from config
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala25
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala2
4 files changed, 34 insertions, 13 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index d002c574..20bfe564 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -25,7 +25,6 @@ import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
import kamon.metrics.instruments.counter.MinMaxCounter
-import kamon.util.Contexts
@Aspect
class BehaviourInvokeTracing {
@@ -35,7 +34,6 @@ class BehaviourInvokeTracing {
@After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- import scala.concurrent.duration._
val metricsExtension = Kamon(Metrics)(system)
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
@@ -44,17 +42,18 @@ class BehaviourInvokeTracing {
cellWithMetrics.metricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
- system.scheduler.schedule(0 milliseconds, 100 milliseconds) {
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒
+ if (cellWithMetrics.actorMetricsRecorder.isDefined) {
+ cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
+ cellWithMetrics.actorMetricsRecorder.map { am ⇒
import am.mailboxSize._
val (min, max, sum) = cellWithMetrics.queueSize.collect()
record(min)
record(max)
record(sum)
+ }
}
- }(metricsExtension.defaultDispatcher)
+ }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -95,7 +94,11 @@ class BehaviourInvokeTracing {
@After("actorStop(cell)")
def afterStop(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map(p ⇒ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity))
+
+ cellWithMetrics.actorMetricsRecorder.map { p ⇒
+ cellWithMetrics.mailboxSizeCollectorCancellable.cancel()
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+ }
}
@Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
@@ -114,6 +117,7 @@ class BehaviourInvokeTracing {
trait ActorCellMetrics {
var metricIdentity: ActorMetrics = _
var actorMetricsRecorder: Option[ActorMetricRecorder] = _
+ var mailboxSizeCollectorCancellable: Cancellable = _
val queueSize = MinMaxCounter()
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
index c703d589..44dd84b0 100644
--- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
@@ -54,7 +54,7 @@ object ActorMetrics extends MetricGroupCategory {
type GroupRecorder = ActorMetricRecorder
def create(config: Config): ActorMetricRecorder = {
- val settings = config.getConfig("kamon.metrics.precision.actor")
+ val settings = config.getConfig("precision.actor")
val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time"))
val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size"))
diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
index 9a08da71..78a82d96 100644
--- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
@@ -24,16 +24,25 @@ import kamon.Kamon
import akka.actor
import kamon.metrics.Metrics.MetricGroupFilter
import kamon.metrics.Subscriptions.Subscribe
+import java.util.concurrent.TimeUnit
class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val config = system.settings.config
+ val metricsExtConfig = system.settings.config.getConfig("kamon.metrics")
+
+ /** Configured Dispatchers */
+ val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions"))
+ val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings"))
+
+ /** Configuration Settings */
+ val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS)
+
val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
- val filters = loadFilters(config)
+ val filters = loadFilters(metricsExtConfig)
lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
if (shouldTrack(identity))
- Some(storage.getOrElseUpdate(identity, factory.create(config)).asInstanceOf[factory.GroupRecorder])
+ Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder])
else
None
}
@@ -50,6 +59,14 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
(for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap
}
+ def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
+ import scala.concurrent.duration._
+
+ system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) {
+ body
+ }(gaugeRecordingsDispatcher)
+ }
+
private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false)
}
@@ -57,7 +74,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
import scala.collection.JavaConverters._
- val filters = config.getObjectList("kamon.metrics.filters").asScala
+ val filters = config.getObjectList("filters").asScala
val allFilters =
for (
diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
index 7c197166..5454edf5 100644
--- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
@@ -53,7 +53,7 @@ object TraceMetrics extends MetricGroupCategory {
def create(config: Config): TraceMetricRecorder = {
- val settings = config.getConfig("kamon.metrics.precision.trace")
+ val settings = config.getConfig("precision.trace")
val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time"))
val segmentConfig = extractPrecisionConfig(settings.getConfig("segment"))