From 9382ce9d66b5d6bfef515cee56f40aa178920335 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 10 Sep 2013 18:35:25 -0300 Subject: Simple instrumentation just for keeping the uow. --- .../ActorRefTellInstrumentation.scala | 59 +++++++--------------- 1 file changed, 17 insertions(+), 42 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala') diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 7d3e36ca..84498cb8 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -3,11 +3,12 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Tracer, TraceContext} +import kamon.{Kamon, Tracer, TraceContext} import akka.dispatch.{MessageDispatcher, Envelope} import com.codahale.metrics.Timer import kamon.metric.{MetricDirectory, Metrics} import scala.Some +import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) @@ -16,76 +17,48 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - val t2 = Metrics.registry.timer("some" + "LATENCY") - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { + import kamon.Instrument.instrumentation.sendMessageTransformation - //val actorName = MetricDirectory.nameForActor(actor) - //val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender) - + pjp.proceedWithTarget(actor, sendMessageTransformation(sender, actor, message).asInstanceOf[AnyRef], sender) } } -@Aspect("perthis(actorCellCreation(..))") -class ActorCellInvokeInstrumentation { - var processingTimeTimer: Timer = _ - var shouldTrack = false +@Aspect("""perthis(actorCellCreation(akka.actor.ActorSystem, akka.actor.ActorRef, akka.actor.Props, akka.dispatch.MessageDispatcher, akka.actor.ActorRef))""") +class ActorCellInvokeInstrumentation { + var instrumentation = ActorReceiveInvokeInstrumentation.noopPreReceive // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} @After("actorCellCreation(system, ref, props, dispatcher, parent)") def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(ref) - val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - - //println("=====> Created ActorCell for: "+ref.toString()) - /** TODO: Find a better way to filter the things we don't want to measure. */ - //if(system.name != "kamon" && actorName.startsWith("/user")) { - processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") - shouldTrack = true - //} + instrumentation = kamon.Instrument.instrumentation.receiveInvokeInstrumentation(system, ref, props, dispatcher, parent) } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import ProceedingJoinPointPimp._ - //println("ENVELOPE --------------------->"+envelope) - envelope match { - case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - //timer.stop() - val originalEnvelope = envelope.copy(message = msg) - - //println("PROCESSING TIME TIMER: "+processingTimeTimer) - val pt = processingTimeTimer.time() - ctx match { - case Some(c) => { - Tracer.set(c) - //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) - pjp.proceedWith(originalEnvelope) - Tracer.clear - } - case None => pjp.proceedWith(originalEnvelope) - } - pt.stop() + val (originalEnvelope, ctx) = instrumentation.preReceive(envelope) + ctx match { + case Some(c) => { + Tracer.set(c) + pjp.proceedWith(originalEnvelope) + Tracer.clear } - case _ => pjp.proceed + case None => pjp.proceedWith(originalEnvelope) } } } @@ -117,3 +90,5 @@ class UnregisteredActorRefInstrumentation { } } } + + -- cgit v1.2.3