aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala')
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala157
1 files changed, 157 insertions, 0 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala
new file mode 100644
index 00000000..7b4664f8
--- /dev/null
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala
@@ -0,0 +1,157 @@
+package akka.kamon.instrumentation
+
+import java.util.concurrent.locks.ReentrantLock
+
+import akka.actor._
+import akka.dispatch.Envelope
+import akka.dispatch.sysmsg.SystemMessage
+import akka.routing.RoutedActorCell
+import kamon.trace.Tracer
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation._
+
+import scala.collection.immutable
+
+@Aspect
+class ActorCellInstrumentation {
+
+ def actorInstrumentation(cell: Cell): ActorMonitor =
+ cell.asInstanceOf[ActorInstrumentationAware].actorInstrumentation
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, *, *, parent)")
+ def actorCellCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: InternalActorRef): Unit = {}
+
+ @Pointcut("execution(akka.actor.UnstartedCell.new(..)) && this(cell) && args(system, ref, *, parent)")
+ def repointableActorRefCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: InternalActorRef): Unit = {}
+
+ @After("actorCellCreation(cell, system, ref, parent)")
+ def afterCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): Unit = {
+ cell.asInstanceOf[ActorInstrumentationAware].setActorInstrumentation(
+ ActorMonitor.createActorMonitor(cell, system, ref, parent))
+ }
+
+ @After("repointableActorRefCreation(cell, system, ref, parent)")
+ def afterRepointableActorRefCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): Unit = {
+ cell.asInstanceOf[ActorInstrumentationAware].setActorInstrumentation(
+ ActorMonitor.createActorMonitor(cell, system, ref, parent))
+ }
+
+ @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {}
+
+ @Around("invokingActorBehaviourAtActorCell(cell, envelope)")
+ def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
+ actorInstrumentation(cell).processMessage(pjp, envelope.asInstanceOf[InstrumentedEnvelope].envelopeContext())
+ }
+
+ /**
+ *
+ *
+ */
+
+ @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)")
+ def sendMessageInActorCell(cell: Cell, envelope: Envelope): Unit = {}
+
+ @Pointcut("execution(* akka.actor.UnstartedCell.sendMessage(*)) && this(cell) && args(envelope)")
+ def sendMessageInUnstartedActorCell(cell: Cell, envelope: Envelope): Unit = {}
+
+ @Before("sendMessageInActorCell(cell, envelope)")
+ def afterSendMessageInActorCell(cell: Cell, envelope: Envelope): Unit = {
+ envelope.asInstanceOf[InstrumentedEnvelope].setEnvelopeContext(
+ actorInstrumentation(cell).captureEnvelopeContext())
+ }
+
+ @Before("sendMessageInUnstartedActorCell(cell, envelope)")
+ def afterSendMessageInUnstartedActorCell(cell: Cell, envelope: Envelope): Unit = {
+ envelope.asInstanceOf[InstrumentedEnvelope].setEnvelopeContext(
+ actorInstrumentation(cell).captureEnvelopeContext())
+ }
+
+ @Pointcut("execution(* akka.actor.UnstartedCell.replaceWith(*)) && this(unStartedCell) && args(cell)")
+ def replaceWithInRepointableActorRef(unStartedCell: UnstartedCell, cell: Cell): Unit = {}
+
+ @Around("replaceWithInRepointableActorRef(unStartedCell, cell)")
+ def aroundReplaceWithInRepointableActorRef(pjp: ProceedingJoinPoint, unStartedCell: UnstartedCell, cell: Cell): Unit = {
+ // TODO: Find a way to do this without resorting to reflection and, even better, without copy/pasting the Akka Code!
+ val unstartedCellClass = classOf[UnstartedCell]
+ val queueField = unstartedCellClass.getDeclaredField("akka$actor$UnstartedCell$$queue")
+ queueField.setAccessible(true)
+
+ val lockField = unstartedCellClass.getDeclaredField("lock")
+ lockField.setAccessible(true)
+
+ val queue = queueField.get(unStartedCell).asInstanceOf[java.util.LinkedList[_]]
+ val lock = lockField.get(unStartedCell).asInstanceOf[ReentrantLock]
+
+ def locked[T](body: ⇒ T): T = {
+ lock.lock()
+ try body finally lock.unlock()
+ }
+
+ locked {
+ try {
+ while (!queue.isEmpty) {
+ queue.poll() match {
+ case s: SystemMessage ⇒ cell.sendSystemMessage(s) // TODO: ============= CHECK SYSTEM MESSAGESSSSS =========
+ case e: Envelope with InstrumentedEnvelope ⇒
+ Tracer.withContext(e.envelopeContext().context) {
+ cell.sendMessage(e)
+ }
+ }
+ }
+ } finally {
+ unStartedCell.self.swapCell(cell)
+ }
+ }
+ }
+
+ /**
+ *
+ */
+
+ @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
+ def actorStop(cell: ActorCell): Unit = {}
+
+ @After("actorStop(cell)")
+ def afterStop(cell: ActorCell): Unit = {
+ actorInstrumentation(cell).cleanup()
+
+ // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here.
+ if (cell.isInstanceOf[RoutedActorCell]) {
+ cell.asInstanceOf[RouterInstrumentationAware].routerInstrumentation.cleanup()
+ }
+ }
+
+ @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell) && args(childrenNotToSuspend, failure)")
+ def actorInvokeFailure(cell: ActorCell, childrenNotToSuspend: immutable.Iterable[ActorRef], failure: Throwable): Unit = {}
+
+ @Before("actorInvokeFailure(cell, childrenNotToSuspend, failure)")
+ def beforeInvokeFailure(cell: ActorCell, childrenNotToSuspend: immutable.Iterable[ActorRef], failure: Throwable): Unit = {
+ actorInstrumentation(cell).processFailure(failure)
+ }
+}
+
+trait ActorInstrumentationAware {
+ def actorInstrumentation: ActorMonitor
+ def setActorInstrumentation(ai: ActorMonitor): Unit
+}
+
+object ActorInstrumentationAware {
+ def apply(): ActorInstrumentationAware = new ActorInstrumentationAware {
+ private var _ai: ActorMonitor = _
+
+ def setActorInstrumentation(ai: ActorMonitor): Unit = _ai = ai
+ def actorInstrumentation: ActorMonitor = _ai
+ }
+}
+
+@Aspect
+class MetricsIntoActorCellsMixin {
+
+ @DeclareMixin("akka.actor.ActorCell")
+ def mixinActorCellMetricsToActorCell: ActorInstrumentationAware = ActorInstrumentationAware()
+
+ @DeclareMixin("akka.actor.UnstartedCell")
+ def mixinActorCellMetricsToUnstartedActorCell: ActorInstrumentationAware = ActorInstrumentationAware()
+
+} \ No newline at end of file