From 99320ad0ad3d8b2bb3ee4e6813315a6d898970ec Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 16 May 2013 18:18:48 -0300 Subject: some work done regarding asking an actor --- src/main/resources/META-INF/aop.xml | 1 + .../ActorRefTellInstrumentation.scala | 2 +- .../instrumentation/PromiseInstrumentation.scala | 51 ++++++++++++++++++++++ src/main/scala/kamon/executor/eventbus.scala | 20 ++++++--- 4 files changed, 68 insertions(+), 6 deletions(-) create mode 100644 src/main/scala/akka/instrumentation/PromiseInstrumentation.scala (limited to 'src') diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index aa4f7f4a..873aa9c5 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -10,6 +10,7 @@ + diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala index 783a6c45..b3b3f65b 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -11,7 +11,7 @@ import akka.dispatch.Envelope class ActorRefTellInstrumentation { println("Created ActorAspect") - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && args(message, sender)") + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(message, sender)") diff --git a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala new file mode 100644 index 00000000..50fc87d1 --- /dev/null +++ b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala @@ -0,0 +1,51 @@ +package akka.instrumentation + +import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect} +import kamon.TraceContext +import scala.util.Try +import java.util.concurrent.ExecutorService +import scala.concurrent.ExecutionContext +import org.aspectj.lang.ProceedingJoinPoint + +@Aspect("perthis(promiseCreation())") +class PromiseInstrumentation { + + println("Created an instrumented promise") + + private var traceContext: Option[TraceContext] = None + + @Pointcut("execution(scala.concurrent.impl.Promise.DefaultPromise.new(..))") + def promiseCreation(): Unit = {} + + @Before("promiseCreation()") + def catchTheTraceContext = { + TraceContext.current match { + case Some(ctx) => traceContext = Some(ctx.fork) + case None => traceContext = None + } + println(s"-----------------> Created a Promise, now the context is:$traceContext") + } + + @Pointcut("execution(* scala.concurrent.Future.onComplete(..)) && args(func, executor)") + def registeringOnCompleteCallback(func: Try[Any] => Any, executor: ExecutionContext) = {} + + @Around("registeringOnCompleteCallback(func, executor)") + def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = { + println("Someone registered a callback") + + val wrappedFunction = traceContext match { + case Some(ctx) => (f: Try[Any]) => { + println("Wrapping some context") + TraceContext.set(ctx) + val result = func + TraceContext.clear + + result + } + case None => println("No Context at all"); func + } + println("Proceeding to the JP") + pjp.proceed(pjp.getArgs.updated(0, wrappedFunction)) + } + +} diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 84420373..8df210fd 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -15,6 +15,8 @@ import akka.util.Timeout import kamon.executor.Ping import kamon.executor.MessageEvent import kamon.executor.Pong +import scala.util.Success +import scala.util.Failure //import kamon.executor.MessageEvent import java.util.UUID @@ -52,7 +54,7 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging { case Pong() => { log.info(s"pong with context ${TraceContext.current}") Thread.sleep(1000) - target ! Ping() + sender ! Ping() } case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) } @@ -93,12 +95,20 @@ object TryAkka extends App{ newRelicReporter.start(1, TimeUnit.SECONDS) */ - - for(i <- 1 to 8) { + import akka.pattern.ask + implicit val timeout = Timeout(10, TimeUnit.SECONDS) + implicit def execContext = system.dispatcher + //for(i <- 1 to 8) { + val i = 1 TraceContext.start val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}") - ping ! Pong() - } + val f = ping ? Pong() + + f.onComplete({ + case Success(p) => println("On my main success") + case Failure(t) => println(s"Something went wrong in the main, with the context: ${TraceContext.current}") + }) + //} /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) -- cgit v1.2.3