aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-05-16 16:37:17 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-05-16 16:37:17 -0300
commit2845f65ba86dadea614083174e9307dc577f4583 (patch)
tree6e15b4cfb643b1e5bf6305a6e484b86c2bd20ee5
parent52750a3eaf077fd332324fa10e2735230fd38116 (diff)
downloadKamon-2845f65ba86dadea614083174e9307dc577f4583.tar.gz
Kamon-2845f65ba86dadea614083174e9307dc577f4583.tar.bz2
Kamon-2845f65ba86dadea614083174e9307dc577f4583.zip
wip in aspects for actor tracing
-rw-r--r--project/Settings.scala1
-rw-r--r--src/main/resources/META-INF/aop.xml10
-rw-r--r--src/main/scala/akka/ActorAspect.scala2
-rw-r--r--src/main/scala/akka/instrumentation/ActorInstrumentation.scala23
-rw-r--r--src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala54
-rw-r--r--src/main/scala/kamon/TraceContext.scala20
-rw-r--r--src/main/scala/kamon/actor/AskSupport.scala2
-rw-r--r--src/main/scala/kamon/actor/TraceableActor.scala6
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala23
9 files changed, 97 insertions, 44 deletions
diff --git a/project/Settings.scala b/project/Settings.scala
index 7eddda5f..de8a3024 100644
--- a/project/Settings.scala
+++ b/project/Settings.scala
@@ -13,6 +13,7 @@ object Settings {
scalacOptions := Seq(
"-encoding",
"utf8",
+ "-g:vars",
"-feature",
"-unchecked",
"-deprecation",
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index 20df0b49..aa4f7f4a 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -5,14 +5,14 @@
<weaver options="-verbose -showWeaveInfo"/>
<aspects>
- <!--<aspect name="akka.ActorSystemAspect"/>
- &lt;!&ndash;<aspect name="akka.MailboxAspect"/>&ndash;&gt;
- <aspect name="akka.PoolMonitorAspect"/>-->
- <aspect name="akka.instrumentation.ActorInstrumentation"/>
+ <aspect name="akka.ActorSystemAspect"/>
+ <!--<aspect name="akka.MailboxAspect"/>-->
+ <aspect name="akka.PoolMonitorAspect"/>
+ <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
+ <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
<include within="*"/>
<exclude within="javax.*"/>
-
<exclude within="org.aspectj.*"/>
<exclude within="scala.*"/>
<exclude within="scalaz.*"/>
diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala
index 9d64f205..05a7bc0a 100644
--- a/src/main/scala/akka/ActorAspect.scala
+++ b/src/main/scala/akka/ActorAspect.scala
@@ -11,7 +11,7 @@ class ActorAspect extends Metrics {
@Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
protected def actorReceive:Unit = {}
- @Around("actorReceive() && this(actor)")
+ @Around("sendingMessageToActorRef() && this(actor)")
def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = {
//println("The path is: "+actor.self.path.)
diff --git a/src/main/scala/akka/instrumentation/ActorInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorInstrumentation.scala
deleted file mode 100644
index ea599891..00000000
--- a/src/main/scala/akka/instrumentation/ActorInstrumentation.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package akka.instrumentation
-
-import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect}
-import org.aspectj.lang.ProceedingJoinPoint
-import kamon.metric.Metrics
-import akka.actor.ActorCell
-
-@Aspect
-class ActorInstrumentation {
- println("Created ActorAspect")
-
- @Pointcut("execution(* kamon.executor.PingActor.receive(..))")
- protected def actorReceive:Unit = {}
-
- @Before("actorReceive() && args(message)")
- def around(message: Any) = {
- println("Around the actor cell receive")
- //pjp.proceed(Array(Wrapper(message)))
- //pjp.proceed
- }
-}
-
-case class Wrapper(content: Any) \ No newline at end of file
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
new file mode 100644
index 00000000..783a6c45
--- /dev/null
+++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
@@ -0,0 +1,54 @@
+package akka.instrumentation
+
+import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{ActorRef, ActorCell}
+import kamon.TraceContext
+import kamon.actor.TraceableMessage
+import akka.dispatch.Envelope
+
+@Aspect
+class ActorRefTellInstrumentation {
+ println("Created ActorAspect")
+
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && args(message, sender)")
+ def sendingMessageToActorRef(message: Any, sender: ActorRef) = {}
+
+ @Around("sendingMessageToActorRef(message, sender)")
+ def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = {
+ import pjp._
+
+ TraceContext.current match {
+ case Some(ctx) => {
+ val traceableMessage = TraceableMessage(ctx.fork, message)
+ proceed(getArgs.updated(0, traceableMessage))
+ }
+ case None => proceed
+ }
+ }
+}
+
+@Aspect
+class ActorCellInvokeInstrumentation {
+
+ @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def around(pjp: ProceedingJoinPoint, envelope: Envelope) = {
+ import pjp._
+
+ envelope match {
+ case Envelope(TraceableMessage(ctx, msg), sender) => {
+ TraceContext.set(ctx)
+
+ val originalEnvelope = envelope.copy(message = msg)
+ proceed(getArgs.updated(0, originalEnvelope))
+
+ TraceContext.clear
+ }
+ case _ => proceed
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala
index b137168c..1fbedf86 100644
--- a/src/main/scala/kamon/TraceContext.scala
+++ b/src/main/scala/kamon/TraceContext.scala
@@ -10,7 +10,21 @@ case class TraceContext(id: UUID, entries: List[TraceEntry]) {
}
object TraceContext {
- val current = new ThreadLocal[TraceContext]
+ private val context = new ThreadLocal[TraceContext]
+
+ def current = {
+ val ctx = context.get()
+ if(ctx ne null)
+ Some(ctx)
+ else
+ None
+ }
+
+ def clear = context.remove()
+
+ def set(ctx: TraceContext) = context.set(ctx)
+
+ def start = set(TraceContext(UUID.randomUUID(), Nil))
}
trait TraceEntry
@@ -31,12 +45,12 @@ trait TraceSupport {
val result = f
val after = System.currentTimeMillis
- swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after)))
+ //swapContext(current.get().withEntry(CodeBlockExecutionTime(blockName, before, after)))
result
}
def swapContext(newContext: TraceContext) {
- current.set(newContext)
+ //current.set(newContext)
}
}
diff --git a/src/main/scala/kamon/actor/AskSupport.scala b/src/main/scala/kamon/actor/AskSupport.scala
index 8a1ac2e8..0a8d27be 100644
--- a/src/main/scala/kamon/actor/AskSupport.scala
+++ b/src/main/scala/kamon/actor/AskSupport.scala
@@ -11,6 +11,6 @@ trait TraceableAskSupport {
// FIXME: This name sucks
class TraceableAskableActorRef(val actorRef: ActorRef) {
- def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get().fork, message))
+ def ??(message: Any)(implicit timeout: Timeout) = akka.pattern.ask(actorRef, TraceableMessage(TraceContext.current.get.fork, message))
}
diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala
index a38b10c9..3acbd293 100644
--- a/src/main/scala/kamon/actor/TraceableActor.scala
+++ b/src/main/scala/kamon/actor/TraceableActor.scala
@@ -9,11 +9,11 @@ trait TraceableActor extends Actor with TracingImplicitConversions {
case a: Any => {
a match {
case TraceableMessage(ctx, message) => {
- TraceContext.current.set(ctx)
+ //TraceContext.current.set(ctx)
tracedReceive(message)
- TraceContext.current.remove()
+ //TraceContext.current.remove()
/** Publish the partial context information to the EventStream */
context.system.eventStream.publish(ctx)
@@ -29,7 +29,7 @@ trait TraceableActor extends Actor with TracingImplicitConversions {
class TraceableActorRef(val target: ActorRef) {
def !! (message: Any)(implicit sender: ActorRef) = {
- val traceableMessage = TraceableMessage(TraceContext.current.get().fork, message)
+ val traceableMessage = TraceableMessage(TraceContext.current.get.fork, message)
target.tell(traceableMessage, sender)
}
}
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index faa076d9..84420373 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -2,7 +2,7 @@ package kamon.executor
import akka.event.ActorEventBus
import akka.event.LookupClassification
-import akka.actor.{ActorRef, ActorSystem, Props, Actor}
+import akka.actor._
import java.util.concurrent.TimeUnit
import kamon.metric.NewRelicReporter
@@ -12,6 +12,9 @@ import kamon.actor._
import scala.concurrent.Future
import kamon.{TraceSupport, TraceContext}
import akka.util.Timeout
+import kamon.executor.Ping
+import kamon.executor.MessageEvent
+import kamon.executor.Pong
//import kamon.executor.MessageEvent
import java.util.UUID
@@ -41,24 +44,27 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{
case class Ping()
case class Pong()
-class PingActor(val target: ActorRef) extends Actor {
+class PingActor(val target: ActorRef) extends Actor with ActorLogging {
implicit def executionContext = context.dispatcher
implicit val timeout = Timeout(30, TimeUnit.SECONDS)
def receive = {
case Pong() => {
- println("pong")
+ log.info(s"pong with context ${TraceContext.current}")
Thread.sleep(1000)
target ! Ping()
}
case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000)
}
+
+ def withAny(): Any = {1}
+ def withAnyRef(): AnyRef = {new Object}
}
-class PongActor extends Actor {
+class PongActor extends Actor with ActorLogging {
def receive = {
case Ping() => {
- println("ping")
+ log.info(s"ping with context ${TraceContext.current}")
sender ! Pong()
}
case a: Any => println(s"Got ${a} in PONG")
@@ -88,10 +94,11 @@ object TryAkka extends App{
*/
- /*for(i <- 1 to 8) {*/
- val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong")
+ for(i <- 1 to 8) {
+ TraceContext.start
+ val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}")
ping ! Pong()
- //}
+ }
/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)