aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-05-20 15:15:59 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-05-20 15:15:59 -0300
commitfa11dbb9448e002de4863fd67924946b7157913d (patch)
tree69af5d929d801015b8f611a7f4bc6efd095b23ec /src
parent99320ad0ad3d8b2bb3ee4e6813315a6d898970ec (diff)
downloadKamon-fa11dbb9448e002de4863fd67924946b7157913d.tar.gz
Kamon-fa11dbb9448e002de4863fd67924946b7157913d.tar.bz2
Kamon-fa11dbb9448e002de4863fd67924946b7157913d.zip
simple instrumentation for keeping a trace context within Futures callbacks
Diffstat (limited to 'src')
-rw-r--r--src/main/resources/META-INF/aop.xml6
-rw-r--r--src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala3
-rw-r--r--src/main/scala/akka/instrumentation/PromiseInstrumentation.scala17
-rw-r--r--src/main/scala/kamon/TraceContext.scala18
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala20
5 files changed, 19 insertions, 45 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index 873aa9c5..28f1c578 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -5,9 +5,9 @@
<weaver options="-verbose -showWeaveInfo"/>
<aspects>
- <aspect name="akka.ActorSystemAspect"/>
- <!--<aspect name="akka.MailboxAspect"/>-->
- <aspect name="akka.PoolMonitorAspect"/>
+ <!--<aspect name="akka.ActorSystemAspect"/>
+ &lt;!&ndash;<aspect name="akka.MailboxAspect"/>&ndash;&gt;
+ <aspect name="akka.PoolMonitorAspect"/>-->
<aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
<aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
<aspect name="akka.instrumentation.PromiseInstrumentation"/>
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
index b3b3f65b..9e816d11 100644
--- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
+++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
@@ -2,7 +2,7 @@ package akka.instrumentation
import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
import org.aspectj.lang.ProceedingJoinPoint
-import akka.actor.{ActorRef, ActorCell}
+import akka.actor.{ActorRef}
import kamon.TraceContext
import kamon.actor.TraceableMessage
import akka.dispatch.Envelope
@@ -28,6 +28,7 @@ class ActorRefTellInstrumentation {
}
}
+
@Aspect
class ActorCellInvokeInstrumentation {
diff --git a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala
index 50fc87d1..918b4aac 100644
--- a/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala
+++ b/src/main/scala/akka/instrumentation/PromiseInstrumentation.scala
@@ -3,15 +3,12 @@ package akka.instrumentation
import org.aspectj.lang.annotation.{Around, Before, Pointcut, Aspect}
import kamon.TraceContext
import scala.util.Try
-import java.util.concurrent.ExecutorService
import scala.concurrent.ExecutionContext
import org.aspectj.lang.ProceedingJoinPoint
@Aspect("perthis(promiseCreation())")
class PromiseInstrumentation {
- println("Created an instrumented promise")
-
private var traceContext: Option[TraceContext] = None
@Pointcut("execution(scala.concurrent.impl.Promise.DefaultPromise.new(..))")
@@ -21,9 +18,8 @@ class PromiseInstrumentation {
def catchTheTraceContext = {
TraceContext.current match {
case Some(ctx) => traceContext = Some(ctx.fork)
- case None => traceContext = None
+ case None => traceContext = None
}
- println(s"-----------------> Created a Promise, now the context is:$traceContext")
}
@Pointcut("execution(* scala.concurrent.Future.onComplete(..)) && args(func, executor)")
@@ -31,21 +27,18 @@ class PromiseInstrumentation {
@Around("registeringOnCompleteCallback(func, executor)")
def around(pjp: ProceedingJoinPoint, func: Try[Any] => Any, executor: ExecutionContext) = {
- println("Someone registered a callback")
val wrappedFunction = traceContext match {
- case Some(ctx) => (f: Try[Any]) => {
- println("Wrapping some context")
+ case Some(ctx) => (tryResult: Try[Any]) => {
TraceContext.set(ctx)
- val result = func
+ val result = func(tryResult)
TraceContext.clear
result
}
- case None => println("No Context at all"); func
+ case None => func
}
- println("Proceeding to the JP")
+
pjp.proceed(pjp.getArgs.updated(0, wrappedFunction))
}
-
}
diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala
index 1fbedf86..18a91145 100644
--- a/src/main/scala/kamon/TraceContext.scala
+++ b/src/main/scala/kamon/TraceContext.scala
@@ -36,21 +36,11 @@ case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) ext
trait TraceSupport {
- import TraceContext.current
-
-
- def trace[T](blockName: String)(f: => T): T = {
- val before = System.currentTimeMillis
-
- val result = f
-
- val after = System.currentTimeMillis
- //swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after)))
+ def withContext[Out](func: => Any => Out, ctx: TraceContext) = {
+ TraceContext.set(ctx)
+ val result = func
+ TraceContext.clear
result
}
-
- def swapContext(newContext: TraceContext) {
- //current.set(newContext)
- }
}
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index 8df210fd..9a54366f 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -4,23 +4,12 @@ import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.actor._
import java.util.concurrent.TimeUnit
-import kamon.metric.NewRelicReporter
-import com.yammer.metrics.core.{MetricName, MetricsRegistry}
-import com.yammer.metrics.reporting.ConsoleReporter
-import kamon.actor._
-import scala.concurrent.Future
-import kamon.{TraceSupport, TraceContext}
+import kamon.{TraceContext}
import akka.util.Timeout
-import kamon.executor.Ping
-import kamon.executor.MessageEvent
-import kamon.executor.Pong
import scala.util.Success
import scala.util.Failure
-
-//import kamon.executor.MessageEvent
-import java.util.UUID
-
+import scala.concurrent.Future
trait Message
@@ -66,8 +55,9 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging {
class PongActor extends Actor with ActorLogging {
def receive = {
case Ping() => {
- log.info(s"ping with context ${TraceContext.current}")
+ Thread.sleep(3000)
sender ! Pong()
+ log.info(s"ping with context ${TraceContext.current}")
}
case a: Any => println(s"Got ${a} in PONG")
}
@@ -105,7 +95,7 @@ object TryAkka extends App{
val f = ping ? Pong()
f.onComplete({
- case Success(p) => println("On my main success")
+ case Success(p) => println(s"On my main success, with the context: ${TraceContext.current}")
case Failure(t) => println(s"Something went wrong in the main, with the context: ${TraceContext.current}")
})
//}