diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala | 60 |
1 files changed, 45 insertions, 15 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 82915ce9..7d3e36ca 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -3,12 +3,10 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Kamon, TraceContext} +import kamon.{Tracer, TraceContext} import akka.dispatch.{MessageDispatcher, Envelope} -import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} +import com.codahale.metrics.Timer import kamon.metric.{MetricDirectory, Metrics} -import com.codahale.metrics -import kamon.instrumentation.TraceableMessage import scala.Some case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) @@ -18,16 +16,19 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)") + val t2 = Metrics.registry.timer("some" + "LATENCY") + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(actor) - val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") - pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + //val actorName = MetricDirectory.nameForActor(actor) + //val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") + pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender) + } } @@ -48,6 +49,7 @@ class ActorCellInvokeInstrumentation { val actorName = MetricDirectory.nameForActor(ref) val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + //println("=====> Created ActorCell for: "+ref.toString()) /** TODO: Find a better way to filter the things we don't want to measure. */ //if(system.name != "kamon" && actorName.startsWith("/user")) { processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") @@ -56,17 +58,17 @@ class ActorCellInvokeInstrumentation { } - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import ProceedingJoinPointPimp._ - println("ENVELOPE --------------------->"+envelope) + //println("ENVELOPE --------------------->"+envelope) envelope match { case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - timer.stop() + //timer.stop() val originalEnvelope = envelope.copy(message = msg) @@ -74,10 +76,10 @@ class ActorCellInvokeInstrumentation { val pt = processingTimeTimer.time() ctx match { case Some(c) => { - Kamon.set(c) - println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + Tracer.set(c) + //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) pjp.proceedWith(originalEnvelope) - Kamon.clear + Tracer.clear } case None => pjp.proceedWith(originalEnvelope) } @@ -87,3 +89,31 @@ class ActorCellInvokeInstrumentation { } } } + + +@Aspect +class UnregisteredActorRefInstrumentation { + @Pointcut("execution(* akka.spray.UnregisteredActorRefBase+.handle(..)) && args(message, sender)") + def sprayResponderHandle(message: Any, sender: ActorRef) = {} + + @Around("sprayResponderHandle(message, sender)") + def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { + import ProceedingJoinPointPimp._ + println("Handling unregistered actor ref message: "+message) + message match { + case TraceableMessage(ctx, msg, timer) => { + timer.stop() + + ctx match { + case Some(c) => { + Tracer.set(c) + pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation. + Tracer.clear + } + case None => pjp.proceedWith(msg.asInstanceOf[AnyRef]) + } + } + case _ => pjp.proceed + } + } +} |