diff options
Diffstat (limited to 'src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala')
-rw-r--r-- | src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index b345eaae..6677f0f7 100644 --- a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -1,51 +1,59 @@ package kamon.instrumentation -import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} +import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} import kamon.{Kamon, TraceContext} import akka.dispatch.Envelope -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} import kamon.metric.{MetricDirectory, Metrics} +import com.codahale.metrics +import kamon.instrumentation.TraceableMessage +import scala.Some -case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timeStamp: Long = System.nanoTime()) +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) @Aspect class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") - def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} + @Pointcut("execution(* akka.actor.LocalActorRef+.$bang(..)) && target(actor) && args(message, sender)") + def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} - @Around("sendingMessageToActorRef(message, sender)") - def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = pjp.proceedWith(TraceableMessage(Kamon.context, message)) + @Around("sendingMessageToActorRef(actor, message, sender)") + def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { + + val actorName = MetricDirectory.nameForActor(actor) + val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"About to proceed with: $actor $message $sender") + pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + } } @Aspect("perthis(actorCellCreation(..))") class ActorCellInvokeInstrumentation { - val latencyHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) + var processingTimeTimer: Timer = _ var shouldTrack = false @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} - @Before("actorCellCreation(system, ref, props, parent)") + @After("actorCellCreation(system, ref, props, parent)") def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = { val actorName = MetricDirectory.nameForActor(ref) val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - // TODO: Find a better way to filter the thins we don't want to measure. - if(system.name != "kamon" && actorName.startsWith("/user")) { - Metrics.registry.register(histogramName + "/cell", latencyHistogram) + /** TODO: Find a better way to filter the things we don't want to measure. */ + //if(system.name != "kamon" && actorName.startsWith("/user")) { + processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") shouldTrack = true - } + //} } - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @@ -53,12 +61,15 @@ class ActorCellInvokeInstrumentation { @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import ProceedingJoinPointPimp._ - + //println("ENVELOPE --------------------->"+envelope) envelope match { - case Envelope(TraceableMessage(ctx, msg, timeStamp), sender) => { - latencyHistogram.update(System.nanoTime() - timeStamp) + case Envelope(TraceableMessage(ctx, msg, timer), sender) => { + timer.stop() val originalEnvelope = envelope.copy(message = msg) + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() ctx match { case Some(c) => { Kamon.set(c) @@ -67,6 +78,7 @@ class ActorCellInvokeInstrumentation { } case None => pjp.proceedWith(originalEnvelope) } + pt.stop() } case _ => pjp.proceed } |