From 923b88e8adef2f66b43e551fa4a0a1bbae5af7ff Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 7 Aug 2013 19:06:33 -0300 Subject: upgrading to akka 2.2 --- .../ActorRefTellInstrumentation.scala | 89 ++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala') diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..82915ce9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,89 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import kamon.{Kamon, TraceContext} +import akka.dispatch.{MessageDispatcher, Envelope} +import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} +import com.codahale.metrics +import kamon.instrumentation.TraceableMessage +import scala.Some + +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) + + +@Aspect +class ActorRefTellInstrumentation { + import ProceedingJoinPointPimp._ + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)") + def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(actor, message, sender)") + def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { + + val actorName = MetricDirectory.nameForActor(actor) + val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") + pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + } +} + + +@Aspect("perthis(actorCellCreation(..))") +class ActorCellInvokeInstrumentation { + + var processingTimeTimer: Timer = _ + var shouldTrack = false + + // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + /** TODO: Find a better way to filter the things we don't want to measure. */ + //if(system.name != "kamon" && actorName.startsWith("/user")) { + processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") + shouldTrack = true + //} + } + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + import ProceedingJoinPointPimp._ + println("ENVELOPE --------------------->"+envelope) + envelope match { + case Envelope(TraceableMessage(ctx, msg, timer), sender) => { + timer.stop() + + val originalEnvelope = envelope.copy(message = msg) + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() + ctx match { + case Some(c) => { + Kamon.set(c) + println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + pt.stop() + } + case _ => pjp.proceed + } + } +} -- cgit v1.2.3