diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-05-20 15:15:59 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-05-20 15:15:59 -0300 |
commit | fa11dbb9448e002de4863fd67924946b7157913d (patch) | |
tree | 69af5d929d801015b8f611a7f4bc6efd095b23ec | |
parent | 99320ad0ad3d8b2bb3ee4e6813315a6d898970ec (diff) | |
download | Kamon-fa11dbb9448e002de4863fd67924946b7157913d.tar.gz Kamon-fa11dbb9448e002de4863fd67924946b7157913d.tar.bz2 Kamon-fa11dbb9448e002de4863fd67924946b7157913d.zip |
simple instrumentation for keeping a trace context within Futures callbacks
-rw-r--r-- | project/Dependencies.scala | 2 | ||||
-rw-r--r-- | src/main/resources/META-INF/aop.xml | 6 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala | 3 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/PromiseInstrumentation.scala | 17 | ||||
-rw-r--r-- | src/main/scala/kamon/TraceContext.scala | 18 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 20 |
6 files changed, 19 insertions, 47 deletions
diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5474228f..491a9c24 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -23,8 +23,6 @@ object Dependencies { val metrics = "com.yammer.metrics" % "metrics-core" % "2.2.0" val metricsScala = "com.yammer.metrics" % "metrics-scala_2.9.1" % "2.2.0" val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.17.2" - val playJson = "play" % "play-json" % "2.2-SNAPSHOT" - def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided") diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 873aa9c5..28f1c578 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -5,9 +5,9 @@ <weaver options="-verbose -showWeaveInfo"/> <aspects> - <aspect name="akka.ActorSystemAspect"/> - <!--<aspect name="akka.MailboxAspect"/>--> - <aspect name="akka.PoolMonitorAspect"/> + <!--<aspect name="akka.ActorSystemAspect"/> + <!–<aspect name="akka.MailboxAspect"/>–> + <aspect name="akka.PoolMonitorAspect"/>--> <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/> <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/> <aspect name="akka.instrumentation.PromiseInstrumentation"/> diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala index b3b3f65b..9e816d11 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -2,7 +2,7 @@ package akka.instrumentation import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{ActorRef, ActorCell} +import akka.actor.{ActorRef} import kamon.TraceContext import kamon.actor.TraceableMessage import akka.dispatch.Envelope @@ -28,6 +28,7 @@ class ActorRefTellInstrumentation { } } + @Aspect class ActorCellInvokeInstrumentation { diff --git a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala index 50fc87d1..918b4aac 100644 --- a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala +++ b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala @@ -3,15 +3,12 @@ package akka.instrumentation import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect} import kamon.TraceContext import scala.util.Try -import java.util.concurrent.ExecutorService import scala.concurrent.ExecutionContext import org.aspectj.lang.ProceedingJoinPoint @Aspect("perthis(promiseCreation())") class PromiseInstrumentation { - println("Created an instrumented promise") - private var traceContext: Option[TraceContext] = None @Pointcut("execution(scala.concurrent.impl.Promise.DefaultPromise.new(..))") @@ -21,9 +18,8 @@ class PromiseInstrumentation { def catchTheTraceContext = { TraceContext.current match { case Some(ctx) => traceContext = Some(ctx.fork) - case None => traceContext = None + case None => traceContext = None } - println(s"-----------------> Created a Promise, now the context is:$traceContext") } @Pointcut("execution(* scala.concurrent.Future.onComplete(..)) && args(func, executor)") @@ -31,21 +27,18 @@ class PromiseInstrumentation { @Around("registeringOnCompleteCallback(func, executor)") def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = { - println("Someone registered a callback") val wrappedFunction = traceContext match { - case Some(ctx) => (f: Try[Any]) => { - println("Wrapping some context") + case Some(ctx) => (tryResult: Try[Any]) => { TraceContext.set(ctx) - val result = func + val result = func(tryResult) TraceContext.clear result } - case None => println("No Context at all"); func + case None => func } - println("Proceeding to the JP") + pjp.proceed(pjp.getArgs.updated(0, wrappedFunction)) } - } diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala index 1fbedf86..18a91145 100644 --- a/src/main/scala/kamon/TraceContext.scala +++ b/src/main/scala/kamon/TraceContext.scala @@ -36,21 +36,11 @@ case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) ext trait TraceSupport { - import TraceContext.current - - - def trace[T](blockName: String)(f: => T): T = { - val before = System.currentTimeMillis - - val result = f - - val after = System.currentTimeMillis - //swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after))) + def withContext[Out](func: => Any => Out, ctx: TraceContext) = { + TraceContext.set(ctx) + val result = func + TraceContext.clear result } - - def swapContext(newContext: TraceContext) { - //current.set(newContext) - } } diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index 8df210fd..9a54366f 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -4,23 +4,12 @@ import akka.event.ActorEventBus import akka.event.LookupClassification import akka.actor._ import java.util.concurrent.TimeUnit -import kamon.metric.NewRelicReporter -import com.yammer.metrics.core.{MetricName, MetricsRegistry} -import com.yammer.metrics.reporting.ConsoleReporter -import kamon.actor._ -import scala.concurrent.Future -import kamon.{TraceSupport, TraceContext} +import kamon.{TraceContext} import akka.util.Timeout -import kamon.executor.Ping -import kamon.executor.MessageEvent -import kamon.executor.Pong import scala.util.Success import scala.util.Failure - -//import kamon.executor.MessageEvent -import java.util.UUID - +import scala.concurrent.Future trait Message @@ -66,8 +55,9 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging { class PongActor extends Actor with ActorLogging { def receive = { case Ping() => { - log.info(s"ping with context ${TraceContext.current}") + Thread.sleep(3000) sender ! Pong() + log.info(s"ping with context ${TraceContext.current}") } case a: Any => println(s"Got ${a} in PONG") } @@ -105,7 +95,7 @@ object TryAkka extends App{ val f = ping ? Pong() f.onComplete({ - case Success(p) => println("On my main success") + case Success(p) => println(s"On my main success, with the context: ${TraceContext.current}") case Failure(t) => println(s"Something went wrong in the main, with the context: ${TraceContext.current}") }) //} |