aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/kamon/instrumentation
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/kamon/instrumentation')
-rw-r--r--src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala13
-rw-r--r--src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala4
-rw-r--r--src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala4
3 files changed, 13 insertions, 8 deletions
diff --git a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 6677f0f7..7398a2bd 100644
--- a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -4,7 +4,7 @@ import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{Props, ActorSystem, ActorRef}
import kamon.{Kamon, TraceContext}
-import akka.dispatch.Envelope
+import akka.dispatch.{MessageDispatcher, Envelope}
import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
import kamon.metric.{MetricDirectory, Metrics}
import com.codahale.metrics
@@ -38,11 +38,13 @@ class ActorCellInvokeInstrumentation {
var processingTimeTimer: Timer = _
var shouldTrack = false
- @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)")
- def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {}
+ // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
- @After("actorCellCreation(system, ref, props, parent)")
- def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
val actorName = MetricDirectory.nameForActor(ref)
val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
@@ -73,6 +75,7 @@ class ActorCellInvokeInstrumentation {
ctx match {
case Some(c) => {
Kamon.set(c)
+ //println("ENVELOPE ORIGINAL:---------------->"+originalEnvelope)
pjp.proceedWith(originalEnvelope)
Kamon.clear
}
diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index 1f3564d3..b4f8a475 100644
--- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -28,6 +28,7 @@ class ActorSystemInstrumentation {
@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
class ForkJoinPoolInstrumentation {
var activeThreadsHistogram: Histogram = _
+ var poolSizeHistogram: Histogram = _
@Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
@@ -42,6 +43,7 @@ class ForkJoinPoolInstrumentation {
val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
for(m <- metrics) {
activeThreadsHistogram = m.activeThreadCount
+ poolSizeHistogram = m.poolSize
println(s"Registered $dispatcherName for actor system $actorSystemName")
}
}
@@ -59,7 +61,7 @@ class ForkJoinPoolInstrumentation {
@After("forkJoinScan(fjp)")
def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
activeThreadsHistogram.update(fjp.getActiveThreadCount)
- println("UPDATED THE COUNT TWOOOO!!!")
+ poolSizeHistogram.update(fjp.getPoolSize)
}
diff --git a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
index 75d6189c..c21502ac 100644
--- a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
+++ b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -1,7 +1,7 @@
package kamon.instrumentation
import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
-import akka.dispatch.{Envelope, MessageQueue}
+import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue}
import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
import akka.actor.{ActorSystem, ActorRef}
import kamon.metric.{Metrics, MetricDirectory}
@@ -44,7 +44,7 @@ class MessageQueueInstrumentation {
}
-class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue {
+class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{
def enqueue(receiver: ActorRef, handle: Envelope) = {
delegate.enqueue(receiver, handle)