diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-05-16 16:37:17 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-05-16 16:37:17 -0300 |
commit | 2845f65ba86dadea614083174e9307dc577f4583 (patch) | |
tree | 6e15b4cfb643b1e5bf6305a6e484b86c2bd20ee5 /src/main/scala | |
parent | 52750a3eaf077fd332324fa10e2735230fd38116 (diff) | |
download | Kamon-2845f65ba86dadea614083174e9307dc577f4583.tar.gz Kamon-2845f65ba86dadea614083174e9307dc577f4583.tar.bz2 Kamon-2845f65ba86dadea614083174e9307dc577f4583.zip |
wip in aspects for actor tracing
Diffstat (limited to 'src/main/scala')
-rw-r--r-- | src/main/scala/akka/ActorAspect.scala | 2 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/ActorInstrumentation.scala | 23 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala | 54 | ||||
-rw-r--r-- | src/main/scala/kamon/TraceContext.scala | 20 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/AskSupport.scala | 2 | ||||
-rw-r--r-- | src/main/scala/kamon/actor/TraceableActor.scala | 6 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 23 |
7 files changed, 91 insertions, 39 deletions
diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala index 9d64f205..05a7bc0a 100644 --- a/src/main/scala/akka/ActorAspect.scala +++ b/src/main/scala/akka/ActorAspect.scala @@ -11,7 +11,7 @@ class ActorAspect extends Metrics { @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") protected def actorReceive:Unit = {} - @Around("actorReceive() && this(actor)") + @Around("sendingMessageToActorRef() && this(actor)") def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = { //println("The path is: "+actor.self.path.) diff --git a/src/main/scala/akka/instrumentation/ActorInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala deleted file mode 100644 index ea599891..00000000 --- a/src/main/scala/akka/instrumentation/ActorInstrumentation.scala +++ /dev/null @@ -1,23 +0,0 @@ -package akka.instrumentation - -import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} -import org.aspectj.lang.ProceedingJoinPoint -import kamon.metric.Metrics -import akka.actor.ActorCell - -@Aspect -class ActorInstrumentation { - println("Created ActorAspect") - - @Pointcut("execution(* kamon.executor.PingActor.receive(..))") - protected def actorReceive:Unit = {} - - @Before("actorReceive() && args(message)") - def around(message: Any) = { - println("Around the actor cell receive") - //pjp.proceed(Array(Wrapper(message))) - //pjp.proceed - } -} - -case class Wrapper(content: Any)
\ No newline at end of file diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..783a6c45 --- /dev/null +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,54 @@ +package akka.instrumentation + +import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{ActorRef, ActorCell} +import kamon.TraceContext +import kamon.actor.TraceableMessage +import akka.dispatch.Envelope + +@Aspect +class ActorRefTellInstrumentation { + println("Created ActorAspect") + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && args(message, sender)") + def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(message, sender)") + def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { + import pjp._ + + TraceContext.current match { + case Some(ctx) => { + val traceableMessage = TraceableMessage(ctx.fork, message) + proceed(getArgs.updated(0, traceableMessage)) + } + case None => proceed + } + } +} + +@Aspect +class ActorCellInvokeInstrumentation { + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope) = { + import pjp._ + + envelope match { + case Envelope(TraceableMessage(ctx, msg), sender) => { + TraceContext.set(ctx) + + val originalEnvelope = envelope.copy(message = msg) + proceed(getArgs.updated(0, originalEnvelope)) + + TraceContext.clear + } + case _ => proceed + } + } +}
\ No newline at end of file diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala index b137168c..1fbedf86 100644 --- a/src/main/scala/kamon/TraceContext.scala +++ b/src/main/scala/kamon/TraceContext.scala @@ -10,7 +10,21 @@ case class TraceContext(id: UUID, entries: List[TraceEntry]) { } object TraceContext { - val current = new ThreadLocal[TraceContext] + private val context = new ThreadLocal[TraceContext] + + def current = { + val ctx = context.get() + if(ctx ne null) + Some(ctx) + else + None + } + + def clear = context.remove() + + def set(ctx: TraceContext) = context.set(ctx) + + def start = set(TraceContext(UUID.randomUUID(), Nil)) } trait TraceEntry @@ -31,12 +45,12 @@ trait TraceSupport { val result = f val after = System.currentTimeMillis - swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after))) + //swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after))) result } def swapContext(newContext: TraceContext) { - current.set(newContext) + //current.set(newContext) } } diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala index 8a1ac2e8..0a8d27be 100644 --- a/src/main/scala/kamon/actor/AskSupport.scala +++ b/src/main/scala/kamon/actor/AskSupport.scala @@ -11,6 +11,6 @@ trait TraceableAskSupport { // FIXME: This name sucks class TraceableAskableActorRef(val actorRef: ActorRef) { - def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get().fork, message)) + def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get.fork, message)) } diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala index a38b10c9..3acbd293 100644 --- a/src/main/scala/kamon/actor/TraceableActor.scala +++ b/src/main/scala/kamon/actor/TraceableActor.scala @@ -9,11 +9,11 @@ trait TraceableActor extends Actor with TracingImplicitConversions { case a: Any => { a match { case TraceableMessage(ctx, message) => { - TraceContext.current.set(ctx) + //TraceContext.current.set(ctx) tracedReceive(message) - TraceContext.current.remove() + //TraceContext.current.remove() /** Publish the partial context information to the EventStream */ context.system.eventStream.publish(ctx) @@ -29,7 +29,7 @@ trait TraceableActor extends Actor with TracingImplicitConversions { class TraceableActorRef(val target: ActorRef) { def !! (message: Any)(implicit sender: ActorRef) = { - val traceableMessage = TraceableMessage(TraceContext.current.get().fork, message) + val traceableMessage = TraceableMessage(TraceContext.current.get.fork, message) target.tell(traceableMessage, sender) } } diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index faa076d9..84420373 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -2,7 +2,7 @@ package kamon.executor import akka.event.ActorEventBus import akka.event.LookupClassification -import akka.actor.{ActorRef, ActorSystem, Props, Actor} +import akka.actor._ import java.util.concurrent.TimeUnit import kamon.metric.NewRelicReporter @@ -12,6 +12,9 @@ import kamon.actor._ import scala.concurrent.Future import kamon.{TraceSupport, TraceContext} import akka.util.Timeout +import kamon.executor.Ping +import kamon.executor.MessageEvent +import kamon.executor.Pong //import kamon.executor.MessageEvent import java.util.UUID @@ -41,24 +44,27 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{ case class Ping() case class Pong() -class PingActor(val target: ActorRef) extends Actor { +class PingActor(val target: ActorRef) extends Actor with ActorLogging { implicit def executionContext = context.dispatcher implicit val timeout = Timeout(30, TimeUnit.SECONDS) def receive = { case Pong() => { - println("pong") + log.info(s"pong with context ${TraceContext.current}") Thread.sleep(1000) target ! Ping() } case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) } + + def withAny(): Any = {1} + def withAnyRef(): AnyRef = {new Object} } -class PongActor extends Actor { +class PongActor extends Actor with ActorLogging { def receive = { case Ping() => { - println("ping") + log.info(s"ping with context ${TraceContext.current}") sender ! Pong() } case a: Any => println(s"Got ${a} in PONG") @@ -88,10 +94,11 @@ object TryAkka extends App{ */ - /*for(i <- 1 to 8) {*/ - val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong") + for(i <- 1 to 8) { + TraceContext.start + val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}") ping ! Pong() - //} + } /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) |