diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala | 104 |
1 files changed, 17 insertions, 87 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 7d3e36ca..7b5d5339 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -4,116 +4,46 @@ import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} import kamon.{Tracer, TraceContext} -import akka.dispatch.{MessageDispatcher, Envelope} +import akka.dispatch.{Envelope, MessageDispatcher} import com.codahale.metrics.Timer -import kamon.metric.{MetricDirectory, Metrics} import scala.Some +import kamon.trace.context.TracingAwareContext case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) - +case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext @Aspect -class ActorRefTellInstrumentation { - import ProceedingJoinPointPimp._ - - val t2 = Metrics.registry.timer("some" + "LATENCY") - - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") - def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} - - @Around("sendingMessageToActorRef(actor, message, sender)") - def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { - - //val actorName = MetricDirectory.nameForActor(actor) - //val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender) - - } -} - - -@Aspect("perthis(actorCellCreation(..))") class ActorCellInvokeInstrumentation { - var processingTimeTimer: Timer = _ - var shouldTrack = false - - // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - @After("actorCellCreation(system, ref, props, dispatcher, parent)") - def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(ref) - val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - - //println("=====> Created ActorCell for: "+ref.toString()) - /** TODO: Find a better way to filter the things we don't want to measure. */ - //if(system.name != "kamon" && actorName.startsWith("/user")) { - processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") - shouldTrack = true - //} - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - import ProceedingJoinPointPimp._ - //println("ENVELOPE --------------------->"+envelope) - envelope match { - case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - //timer.stop() - - val originalEnvelope = envelope.copy(message = msg) + //safe cast + val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext - //println("PROCESSING TIME TIMER: "+processingTimeTimer) - val pt = processingTimeTimer.time() - ctx match { - case Some(c) => { - Tracer.set(c) - //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) - pjp.proceedWith(originalEnvelope) - Tracer.clear - } - case None => pjp.proceedWith(originalEnvelope) - } - pt.stop() - } - case _ => pjp.proceed + Tracer.traceContext.withValue(msgContext) { + pjp.proceed() } } } - @Aspect -class UnregisteredActorRefInstrumentation { - @Pointcut("execution(* akka.spray.UnregisteredActorRefBase+.handle(..)) && args(message, sender)") - def sprayResponderHandle(message: Any, sender: ActorRef) = {} +class EnvelopeTracingContext { - @Around("sprayResponderHandle(message, sender)") - def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { - import ProceedingJoinPointPimp._ - println("Handling unregistered actor ref message: "+message) - message match { - case TraceableMessage(ctx, msg, timer) => { - timer.stop() + @DeclareMixin("akka.dispatch.Envelope") + def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext() - ctx match { - case Some(c) => { - Tracer.set(c) - pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation. - Tracer.clear - } - case None => pjp.proceedWith(msg.asInstanceOf[AnyRef]) - } - } - case _ => pjp.proceed - } + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def requestRecordInit(ctx: TracingAwareContext): Unit = {} + + @After("requestRecordInit(ctx)") + def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = { + // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. + ctx.traceContext } } |