From f9b596754feb657f1130eebd0cc4ac2a5e741518 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sat, 23 Nov 2013 21:57:27 -0300 Subject: Little clean up to actor message passing tracing --- kamon-trace/src/main/resources/META-INF/aop.xml | 4 + .../ActorMessagePassingTracing.scala | 45 +++++++++++ .../ActorRefTellInstrumentation.scala | 47 ----------- .../scala/kamon/ActorInstrumentationSpec.scala | 94 ---------------------- .../kamon/ActorMessagePassingTracingSpec.scala | 73 +++++++++++++++++ 5 files changed, 122 insertions(+), 141 deletions(-) create mode 100644 kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala delete mode 100644 kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala delete mode 100644 kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala create mode 100644 kamon-trace/src/test/scala/kamon/ActorMessagePassingTracingSpec.scala (limited to 'kamon-trace') diff --git a/kamon-trace/src/main/resources/META-INF/aop.xml b/kamon-trace/src/main/resources/META-INF/aop.xml index fdc1c496..090cac42 100644 --- a/kamon-trace/src/main/resources/META-INF/aop.xml +++ b/kamon-trace/src/main/resources/META-INF/aop.xml @@ -2,8 +2,12 @@ + + + + diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala new file mode 100644 index 00000000..9eb0a7f9 --- /dev/null +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala @@ -0,0 +1,45 @@ +package kamon.trace.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import akka.dispatch.{Envelope, MessageDispatcher} +import com.codahale.metrics.Timer +import kamon.trace.{ContextAware, TraceContext, Trace} + + +@Aspect +class BehaviourInvokeTracing { + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + //safe cast + val ctxInMessage = envelope.asInstanceOf[ContextAware].traceContext + + Trace.withValue(ctxInMessage) { + pjp.proceed() + } + } +} + +@Aspect +class EnvelopeTraceContextMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def envelopeCreation(ctx: ContextAware): Unit = {} + + @After("envelopeCreation(ctx)") + def afterEnvelopeCreation(ctx: ContextAware): Unit = { + // Necessary to force the initialization of ContextAware at the moment of creation. + ctx.traceContext + } +} diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala deleted file mode 100644 index 057e339d..00000000 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala +++ /dev/null @@ -1,47 +0,0 @@ -package kamon.trace.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{Props, ActorSystem, ActorRef} -import akka.dispatch.{Envelope, MessageDispatcher} -import com.codahale.metrics.Timer -import kamon.trace.{ContextAware, TraceContext, Trace} - -case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) -case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Trace.traceContext.value, timestamp: Long = System.nanoTime) extends ContextAware - -@Aspect -class ActorCellInvokeInstrumentation { - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") - def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(envelope)") - def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - //safe cast - val msgContext = envelope.asInstanceOf[ContextAware].traceContext - - Trace.traceContext.withValue(msgContext) { - pjp.proceed() - } - } -} - -@Aspect -class EnvelopeTracingContext { - - @DeclareMixin("akka.dispatch.Envelope") - def mixin: ContextAware = ContextAware.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def requestRecordInit(ctx: ContextAware): Unit = {} - - @After("requestRecordInit(ctx)") - def whenCreatedRequestRecord(ctx: ContextAware): Unit = { - // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - ctx.traceContext - } -} diff --git a/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala deleted file mode 100644 index d675c4f4..00000000 --- a/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala +++ /dev/null @@ -1,94 +0,0 @@ -package kamon - -import org.scalatest.{WordSpecLike, Matchers} -import akka.actor.{ActorRef, Actor, Props, ActorSystem} - -import akka.testkit.{ImplicitSender, TestKit} -import kamon.trace.Trace -import akka.pattern.{pipe, ask} -import akka.util.Timeout -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -import akka.routing.RoundRobinRouter -import kamon.trace.TraceContext - - -class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { - implicit val executionContext = system.dispatcher - - "an instrumented actor ref" when { - "used inside the context of a transaction" should { - "propagate the trace context using bang" in new TraceContextEchoFixture { - echo ! "test" - - expectMsg(Some(testTraceContext)) - } - - "propagate the trace context using tell" in new TraceContextEchoFixture { - echo.tell("test", testActor) - - expectMsg(Some(testTraceContext)) - } - - "propagate the trace context using ask" in new TraceContextEchoFixture { - implicit val timeout = Timeout(1 seconds) - (echo ? "test") pipeTo(testActor) - - expectMsg(Some(testTraceContext)) - } - - "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture { - val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test")) - - expectMsgAllOf(contexts: _*) - } - - /*"propagate with many asks" in { - val echo = system.actorOf(Props[TraceContextEcho]) - val iterations = 50000 - implicit val timeout = Timeout(10 seconds) - - val futures = for(_ <- 1 to iterations) yield { - Tracer.start - val result = (echo ? "test") - Tracer.clear - - result - } - - val allResults = Await.result(Future.sequence(futures), 10 seconds) - assert(iterations == allResults.collect { - case Some(_) => 1 - }.sum) - }*/ - } - } - - trait TraceContextEchoFixture { - val testTraceContext = Trace.newTraceContext("") - val echo = system.actorOf(Props[TraceContextEcho]) - - Trace.set(testTraceContext) - } - - trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { - override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10))) - - def tellWithNewContext(target: ActorRef, message: Any): TraceContext = { - val context = Trace.newTraceContext("") - Trace.set(context) - - target ! message - context - } - } - -} - -class TraceContextEcho extends Actor { - def receive = { - case msg: String ⇒ sender ! Trace.context() - } -} - - diff --git a/kamon-trace/src/test/scala/kamon/ActorMessagePassingTracingSpec.scala b/kamon-trace/src/test/scala/kamon/ActorMessagePassingTracingSpec.scala new file mode 100644 index 00000000..096ac986 --- /dev/null +++ b/kamon-trace/src/test/scala/kamon/ActorMessagePassingTracingSpec.scala @@ -0,0 +1,73 @@ +package kamon + +import org.scalatest.{WordSpecLike, Matchers} +import akka.actor.{ActorRef, Actor, Props, ActorSystem} + +import akka.testkit.{ImplicitSender, TestKit} +import kamon.trace.Trace +import akka.pattern.{pipe, ask} +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import akka.routing.RoundRobinRouter +import kamon.trace.TraceContext + + +class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender { + implicit val executionContext = system.dispatcher + + "the message passing instrumentation" should { + "propagate the TraceContext using bang" in new TraceContextEchoFixture { + Trace.withValue(testTraceContext) { + ctxEchoActor ! "test" + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext using tell" in new TraceContextEchoFixture { + Trace.withValue(testTraceContext) { + ctxEchoActor.tell("test", testActor) + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext using ask" in new TraceContextEchoFixture { + implicit val timeout = Timeout(1 seconds) + Trace.withValue(testTraceContext) { + // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. + (ctxEchoActor ? "test") pipeTo(testActor) + } + + expectMsg(testTraceContext) + } + + "propagate the TraceContext to actors behind a router" in new RoutedTraceContextEchoFixture { + Trace.withValue(testTraceContext) { + ctxEchoActor ! "test" + } + + expectMsg(testTraceContext) + } + } + + trait TraceContextEchoFixture { + val testTraceContext = Some(Trace.newTraceContext("")) + val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) + } + + trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { + override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 1))) + } +} + +class TraceContextEcho extends Actor { + def receive = { + case msg: String ⇒ sender ! Trace.context() + } +} + + + + -- cgit v1.2.3