aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-05-18 21:38:55 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-05-18 21:38:55 -0300
commitb18d05149881e9eb141f9f4878916f37c5b52247 (patch)
tree65928ffb8648330bcc351c175cab3711dfe2ad2e
parent9edd4676b3f5979d9ecd5a3942480306ffd27616 (diff)
downloadKamon-b18d05149881e9eb141f9f4878916f37c5b52247.tar.gz
Kamon-b18d05149881e9eb141f9f4878916f37c5b52247.tar.bz2
Kamon-b18d05149881e9eb141f9f4878916f37c5b52247.zip
= core: move the scheduling of gauge recordings to MetricsExtension and load interval for recordings from config
-rw-r--r--kamon-core/src/main/resources/reference.conf45
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala25
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala2
5 files changed, 52 insertions, 40 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index e5168929..b1e5309d 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -4,8 +4,24 @@
kamon {
metrics {
+
+ # Time interval at which Kamon will collect all metrics and send them to all subscribed actors.
tick-interval = 1 second
+ # Time interval at which Kamon will record values for all registered gauges.
+ gauge-recording-interval = 100 milliseconds
+
+ dispatchers {
+
+ # All Gauges record values periodically according to the `kamon.metrics.gauge-recording-interval` setting.
+ # This dispatcher is the one to be used to execute the recording code.
+ gauge-recordings = "akka.actor.default-dispatcher"
+
+ # Dispatcher for the actor managing all subscriptions and metrics collection.
+ metric-subscriptions = "akka.actor.default-dispatcher"
+ }
+
+
filters = [
{
actor {
@@ -56,34 +72,8 @@ kamon {
}
}
- default-dispatcher {
- # Dispatcher is the name of the event-based dispatcher
- type = Dispatcher
-
- # What kind of ExecutionService to use
- executor = "fork-join-executor"
-
- # Configuration for the fork join pool
- fork-join-executor {
-
- # Min number of threads to cap factor-based parallelism number to
- parallelism-min = 2
-
- # Parallelism (threads) ... ceil(available processors * factor)
- parallelism-factor = 2.0
-
- # Max number of threads to cap factor-based parallelism number to
- parallelism-max = 10
- }
-
- # Throughput defines the maximum number of messages to be
- # processed per actor before the thread jumps to the next actor.
- # Set to 1 for as fair as possible.
- throughput = 100
- }
-
-
trace {
+
# If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask`
# pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment
# the future was created.
@@ -91,6 +81,7 @@ kamon {
}
weaver {
+
# AspectJ options supported by LTW
# showWeaveInfo: show informational messages whenever the weaver touches a class file.
# verbose: show informational messages about the weaving process.
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index d002c574..20bfe564 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -25,7 +25,6 @@ import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
import kamon.metrics.instruments.counter.MinMaxCounter
-import kamon.util.Contexts
@Aspect
class BehaviourInvokeTracing {
@@ -35,7 +34,6 @@ class BehaviourInvokeTracing {
@After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- import scala.concurrent.duration._
val metricsExtension = Kamon(Metrics)(system)
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
@@ -44,17 +42,18 @@ class BehaviourInvokeTracing {
cellWithMetrics.metricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
- system.scheduler.schedule(0 milliseconds, 100 milliseconds) {
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒
+ if (cellWithMetrics.actorMetricsRecorder.isDefined) {
+ cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
+ cellWithMetrics.actorMetricsRecorder.map { am ⇒
import am.mailboxSize._
val (min, max, sum) = cellWithMetrics.queueSize.collect()
record(min)
record(max)
record(sum)
+ }
}
- }(metricsExtension.defaultDispatcher)
+ }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -95,7 +94,11 @@ class BehaviourInvokeTracing {
@After("actorStop(cell)")
def afterStop(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map(p ⇒ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity))
+
+ cellWithMetrics.actorMetricsRecorder.map { p ⇒
+ cellWithMetrics.mailboxSizeCollectorCancellable.cancel()
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+ }
}
@Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
@@ -114,6 +117,7 @@ class BehaviourInvokeTracing {
trait ActorCellMetrics {
var metricIdentity: ActorMetrics = _
var actorMetricsRecorder: Option[ActorMetricRecorder] = _
+ var mailboxSizeCollectorCancellable: Cancellable = _
val queueSize = MinMaxCounter()
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
index c703d589..44dd84b0 100644
--- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
@@ -54,7 +54,7 @@ object ActorMetrics extends MetricGroupCategory {
type GroupRecorder = ActorMetricRecorder
def create(config: Config): ActorMetricRecorder = {
- val settings = config.getConfig("kamon.metrics.precision.actor")
+ val settings = config.getConfig("precision.actor")
val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time"))
val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size"))
diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
index 9a08da71..78a82d96 100644
--- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
@@ -24,16 +24,25 @@ import kamon.Kamon
import akka.actor
import kamon.metrics.Metrics.MetricGroupFilter
import kamon.metrics.Subscriptions.Subscribe
+import java.util.concurrent.TimeUnit
class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val config = system.settings.config
+ val metricsExtConfig = system.settings.config.getConfig("kamon.metrics")
+
+ /** Configured Dispatchers */
+ val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions"))
+ val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings"))
+
+ /** Configuration Settings */
+ val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS)
+
val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
- val filters = loadFilters(config)
+ val filters = loadFilters(metricsExtConfig)
lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
if (shouldTrack(identity))
- Some(storage.getOrElseUpdate(identity, factory.create(config)).asInstanceOf[factory.GroupRecorder])
+ Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder])
else
None
}
@@ -50,6 +59,14 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
(for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap
}
+ def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
+ import scala.concurrent.duration._
+
+ system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) {
+ body
+ }(gaugeRecordingsDispatcher)
+ }
+
private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false)
}
@@ -57,7 +74,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
import scala.collection.JavaConverters._
- val filters = config.getObjectList("kamon.metrics.filters").asScala
+ val filters = config.getObjectList("filters").asScala
val allFilters =
for (
diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
index 7c197166..5454edf5 100644
--- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
@@ -53,7 +53,7 @@ object TraceMetrics extends MetricGroupCategory {
def create(config: Config): TraceMetricRecorder = {
- val settings = config.getConfig("kamon.metrics.precision.trace")
+ val settings = config.getConfig("precision.trace")
val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time"))
val segmentConfig = extractPrecisionConfig(settings.getConfig("segment"))