diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-05-16 18:18:48 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-05-16 18:18:48 -0300 |
commit | 99320ad0ad3d8b2bb3ee4e6813315a6d898970ec (patch) | |
tree | 1e24fc263b84205bb05db54d39fa29805534a5f1 /src | |
parent | d99d5c20e1fb6b02818c6add20fc54e485fc9ff3 (diff) | |
download | Kamon-99320ad0ad3d8b2bb3ee4e6813315a6d898970ec.tar.gz Kamon-99320ad0ad3d8b2bb3ee4e6813315a6d898970ec.tar.bz2 Kamon-99320ad0ad3d8b2bb3ee4e6813315a6d898970ec.zip |
some work done regarding asking an actor
Diffstat (limited to 'src')
4 files changed, 68 insertions, 6 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index aa4f7f4a..873aa9c5 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -10,6 +10,7 @@ <aspect name="akka.PoolMonitorAspect"/> <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/> <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/> + <aspect name="akka.instrumentation.PromiseInstrumentation"/> <include within="*"/> <exclude within="javax.*"/> diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala index 783a6c45..b3b3f65b 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -11,7 +11,7 @@ import akka.dispatch.Envelope class ActorRefTellInstrumentation { println("Created ActorAspect") - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && args(message, sender)") + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(message, sender)") diff --git a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala new file mode 100644 index 00000000..50fc87d1 --- /dev/null +++ b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala @@ -0,0 +1,51 @@ +package akka.instrumentation + +import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect} +import kamon.TraceContext +import scala.util.Try +import java.util.concurrent.ExecutorService +import scala.concurrent.ExecutionContext +import org.aspectj.lang.ProceedingJoinPoint + +@Aspect("perthis(promiseCreation())") +class PromiseInstrumentation { + + println("Created an instrumented promise") + + private var traceContext: Option[TraceContext] = None + + @Pointcut("execution(scala.concurrent.impl.Promise.DefaultPromise.new(..))") + def promiseCreation(): Unit = {} + + @Before("promiseCreation()") + def catchTheTraceContext = { + TraceContext.current match { + case Some(ctx) => traceContext = Some(ctx.fork) + case None => traceContext = None + } + println(s"-----------------> Created a Promise, now the context is:$traceContext") + } + + @Pointcut("execution(* scala.concurrent.Future.onComplete(..)) && args(func, executor)") + def registeringOnCompleteCallback(func: Try[Any] => Any, executor: ExecutionContext) = {} + + @Around("registeringOnCompleteCallback(func, executor)") + def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = { + println("Someone registered a callback") + + val wrappedFunction = traceContext match { + case Some(ctx) => (f: Try[Any]) => { + println("Wrapping some context") + TraceContext.set(ctx) + val result = func + TraceContext.clear + + result + } + case None => println("No Context at all"); func + } + println("Proceeding to the JP") + pjp.proceed(pjp.getArgs.updated(0, wrappedFunction)) + } + +} diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 84420373..8df210fd 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -15,6 +15,8 @@ import akka.util.Timeout import kamon.executor.Ping import kamon.executor.MessageEvent import kamon.executor.Pong +import scala.util.Success +import scala.util.Failure //import kamon.executor.MessageEvent import java.util.UUID @@ -52,7 +54,7 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging { case Pong() => { log.info(s"pong with context ${TraceContext.current}") Thread.sleep(1000) - target ! Ping() + sender ! Ping() } case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) } @@ -93,12 +95,20 @@ object TryAkka extends App{ newRelicReporter.start(1, TimeUnit.SECONDS) */ - - for(i <- 1 to 8) { + import akka.pattern.ask + implicit val timeout = Timeout(10, TimeUnit.SECONDS) + implicit def execContext = system.dispatcher + //for(i <- 1 to 8) { + val i = 1 TraceContext.start val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}") - ping ! Pong() - } + val f = ping ? Pong() + + f.onComplete({ + case Success(p) => println("On my main success") + case Failure(t) => println(s"Something went wrong in the main, with the context: ${TraceContext.current}") + }) + //} /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) |