aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-05-16 18:18:48 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-05-16 18:18:48 -0300
commit99320ad0ad3d8b2bb3ee4e6813315a6d898970ec (patch)
tree1e24fc263b84205bb05db54d39fa29805534a5f1 /src
parentd99d5c20e1fb6b02818c6add20fc54e485fc9ff3 (diff)
downloadKamon-99320ad0ad3d8b2bb3ee4e6813315a6d898970ec.tar.gz
Kamon-99320ad0ad3d8b2bb3ee4e6813315a6d898970ec.tar.bz2
Kamon-99320ad0ad3d8b2bb3ee4e6813315a6d898970ec.zip
some work done regarding asking an actor
Diffstat (limited to 'src')
-rw-r--r--src/main/resources/META-INF/aop.xml1
-rw-r--r--src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala2
-rw-r--r--src/main/scala/akka/instrumentation/PromiseInstrumentation.scala51
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala20
4 files changed, 68 insertions, 6 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index aa4f7f4a..873aa9c5 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -10,6 +10,7 @@
<aspect name="akka.PoolMonitorAspect"/>
<aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
<aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
+ <aspect name="akka.instrumentation.PromiseInstrumentation"/>
<include within="*"/>
<exclude within="javax.*"/>
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
index 783a6c45..b3b3f65b 100644
--- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
+++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
@@ -11,7 +11,7 @@ import akka.dispatch.Envelope
class ActorRefTellInstrumentation {
println("Created ActorAspect")
- @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && args(message, sender)")
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)")
def sendingMessageToActorRef(message: Any, sender: ActorRef) = {}
@Around("sendingMessageToActorRef(message, sender)")
diff --git a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala
new file mode 100644
index 00000000..50fc87d1
--- /dev/null
+++ b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala
@@ -0,0 +1,51 @@
+package akka.instrumentation
+
+import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect}
+import kamon.TraceContext
+import scala.util.Try
+import java.util.concurrent.ExecutorService
+import scala.concurrent.ExecutionContext
+import org.aspectj.lang.ProceedingJoinPoint
+
+@Aspect("perthis(promiseCreation())")
+class PromiseInstrumentation {
+
+ println("Created an instrumented promise")
+
+ private var traceContext: Option[TraceContext] = None
+
+ @Pointcut("execution(scala.concurrent.impl.Promise.DefaultPromise.new(..))")
+ def promiseCreation(): Unit = {}
+
+ @Before("promiseCreation()")
+ def catchTheTraceContext = {
+ TraceContext.current match {
+ case Some(ctx) => traceContext = Some(ctx.fork)
+ case None => traceContext = None
+ }
+ println(s"-----------------> Created a Promise, now the context is:$traceContext")
+ }
+
+ @Pointcut("execution(* scala.concurrent.Future.onComplete(..)) && args(func, executor)")
+ def registeringOnCompleteCallback(func: Try[Any] => Any, executor: ExecutionContext) = {}
+
+ @Around("registeringOnCompleteCallback(func, executor)")
+ def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = {
+ println("Someone registered a callback")
+
+ val wrappedFunction = traceContext match {
+ case Some(ctx) => (f: Try[Any]) => {
+ println("Wrapping some context")
+ TraceContext.set(ctx)
+ val result = func
+ TraceContext.clear
+
+ result
+ }
+ case None => println("No Context at all"); func
+ }
+ println("Proceeding to the JP")
+ pjp.proceed(pjp.getArgs.updated(0, wrappedFunction))
+ }
+
+}
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index 84420373..8df210fd 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -15,6 +15,8 @@ import akka.util.Timeout
import kamon.executor.Ping
import kamon.executor.MessageEvent
import kamon.executor.Pong
+import scala.util.Success
+import scala.util.Failure
//import kamon.executor.MessageEvent
import java.util.UUID
@@ -52,7 +54,7 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging {
case Pong() => {
log.info(s"pong with context ${TraceContext.current}")
Thread.sleep(1000)
- target ! Ping()
+ sender ! Ping()
}
case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000)
}
@@ -93,12 +95,20 @@ object TryAkka extends App{
newRelicReporter.start(1, TimeUnit.SECONDS)
*/
-
- for(i <- 1 to 8) {
+ import akka.pattern.ask
+ implicit val timeout = Timeout(10, TimeUnit.SECONDS)
+ implicit def execContext = system.dispatcher
+ //for(i <- 1 to 8) {
+ val i = 1
TraceContext.start
val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}")
- ping ! Pong()
- }
+ val f = ping ? Pong()
+
+ f.onComplete({
+ case Success(p) => println("On my main success")
+ case Failure(t) => println(s"Something went wrong in the main, with the context: ${TraceContext.current}")
+ })
+ //}
/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)